Web_scraping_project/llm_agent.py
Ofure Ikheloa fd4e8c9c05 feat(scraper): add LLM-powered job data refinement and new scraping logic
- Implement LLMJobRefiner class for processing job data with Gemini API
- Add new job_scraper2.py with enhanced scraping capabilities
- Remove search_keywords parameter from scraping engine
- Add environment variable loading in config.py
- Update main script to use new scraper and target field
2025-11-24 12:25:50 +01:00

166 lines
6.3 KiB
Python

import google.generativeai as genai
from typing import Dict, Any
import asyncio
import sqlite3
import os
from datetime import datetime
from config import GEMINI_API_KEY
class LLMJobRefiner:
def __init__(self):
genai.configure(api_key=GEMINI_API_KEY)
self.model = genai.GenerativeModel('gemini-pro')
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
"""
Refine raw job data using Gemini LLM based on target field
"""
prompt = f"""
You are a job data extraction assistant. Extract the following fields from the job posting:
- title
- company_name
- location
- description
- requirements
- qualifications
- salary_range
- nature_of_work (remote, onsite, or hybrid)
- job_id
Target Field: {target_field}
Raw Page Content:
{raw_data.get('page_content', '')[:3000]} # Limit content size
Instructions:
1. Extract only the information relevant to the target field: {target_field}
2. Clean up any formatting issues in the description
3. Standardize location format (city, state/country)
4. Extract salary range if mentioned in description
5. Determine nature of work (remote, onsite, or hybrid) from work arrangements
6. Ensure all fields are properly formatted
7. If a field cannot be found, use "N/A"
8. Return the refined data in JSON format
Response format (only return the JSON):
{{
"title": "...",
"company_name": "...",
"location": "...",
"description": "...",
"requirements": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{raw_data.get('job_id', 'unknown')}",
"url": "{raw_data.get('url', 'N/A')}"
}}
"""
try:
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.model.generate_content(prompt)
)
# Parse the response and return refined data
refined_data = self._parse_llm_response(response.text)
# If parsing fails, return None
if not refined_data:
return None
return refined_data
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
"""
Parse the LLM response to extract refined job data
"""
import json
import re
# Extract JSON from response (handle markdown code blocks)
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# If no code block, try to find JSON directly
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(0)
else:
return None
try:
return json.loads(json_str)
except json.JSONDecodeError:
return None
async def save_job_data(self, job_data: Dict[str, Any], keyword: str):
"""
Save job data to both markdown and database
"""
# Save to database
await self._save_to_db(job_data)
# Save to markdown
await self._save_to_markdown(job_data, keyword)
async def _save_to_db(self, job_data: Dict[str, Any]):
"""
Save job data to database
"""
db_path = "linkedin_jobs.db"
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
(title, company_name, location, description, requirements,
qualifications, salary_range, nature_of_work, job_id, url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
job_data.get("location", "N/A"),
job_data.get("description", "N/A"),
job_data.get("requirements", "N/A"),
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A")
))
conn.commit()
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
"""
Save job data to markdown file
"""
os.makedirs("linkedin_jobs", exist_ok=True)
# Create a single markdown file for all jobs
filename = "linkedin_jobs_scraped.md"
filepath = os.path.join("linkedin_jobs", filename)
with open(filepath, "a", encoding="utf-8") as f:
# Only write header if file is empty
if os.path.getsize(filepath) == 0:
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
f.write(f"- **Keyword**: {keyword}\n")
f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n")
f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n")
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
f.write("---\n\n")