from openai import OpenAI from typing import Dict, Any, Optional import asyncio import sqlite3 import os from datetime import datetime import json import re from dotenv import load_dotenv # ✅ Actually load .env load_dotenv() class LLMJobRefiner: def __init__(self): xai_api_key = os.getenv("XAI_API_KEY") if not xai_api_key: raise ValueError("XAI_API_KEY not found in environment variables.") self.client = OpenAI(api_key=xai_api_key, base_url="https://api.x.ai/v1") self.model = "grok-4-latest" self.extraction_schema_cache = {} def generate_content(self, prompt: str, system_message: str = "You are a helpful assistant.") -> str: """Synchronous method to call Grok via xAI API.""" try: response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": prompt} ], temperature=0.2, max_tokens=2048, stream=False ) return response.choices[0].message.content or "" except Exception as e: print(f"Error in Grok API call: {e}") return "" async def refine_job_data(self, raw_data: Dict[str, Any], user_request: str) -> Optional[Dict[str, Any]]: page_content = raw_data.get('page_content', '') if not page_content: return None schema_key = user_request.lower().strip() extraction_schema = self.extraction_schema_cache.get(schema_key) if not extraction_schema: extraction_schema = await self._generate_extraction_schema(user_request) if extraction_schema: self.extraction_schema_cache[schema_key] = extraction_schema else: extraction_schema = self._get_default_schema() prompt = f""" You are a highly skilled web data extraction assistant. Your task is to analyze the raw HTML content of a job posting page and extract specific information requested by the user. The user's request is: "{user_request}" The raw HTML content of the page is provided below (limited in size). The content might be noisy or unstructured. Your goal is to: 1. Analyze the HTML structure to identify relevant sections. 2. Extract the requested information accurately. 3. Clean up formatting issues. 4. If a field cannot be found, use "N/A". 5. Return ONLY the extracted data in a JSON object based on the following schema: {json.dumps(extraction_schema, indent=2)} Raw Page Content (HTML): {page_content[:6000]} Respond with the JSON object containing the extracted data. """ try: # ✅ Use self (current instance), NOT a new LLMJobRefiner() response_text = await asyncio.get_event_loop().run_in_executor( None, lambda: self.generate_content(prompt) ) refined_data = self._parse_llm_response(response_text) if not refined_data: return None refined_data['job_id'] = raw_data.get('job_id', 'unknown') refined_data['url'] = raw_data.get('url', 'N/A') return refined_data except Exception as e: print(f"LLM refinement failed: {str(e)}") return None async def _generate_extraction_schema(self, user_request: str) -> Optional[Dict[str, str]]: schema_prompt = f""" Based on the user's request: "{user_request}", generate a JSON schema for the data they want to extract from a job posting. The schema should be a dictionary where keys are field names (snake_case) and values are short descriptions. Include standard fields like title, company_name, location, description, etc., if relevant. Respond with only the JSON schema. """ try: # ✅ Use self.generate_content, NOT self.model.generate_content schema_text = await asyncio.get_event_loop().run_in_executor( None, lambda: self.generate_content(schema_prompt) ) json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', schema_text, re.DOTALL) if not json_match: json_match = re.search(r'\{.*\}', schema_text, re.DOTALL) if not json_match: return None json_str = json_match.group(1) if '```' in schema_text else json_match.group(0) return json.loads(json_str) except Exception as e: print(f"Schema generation failed: {str(e)}") return None def _get_default_schema(self) -> Dict[str, str]: return { "title": "The job title", "company_name": "The name of the company", "location": "The location of the job", "description": "The full job description", "requirements": "List of job requirements", "qualifications": "List of required qualifications", "salary_range": "The salary range mentioned", "nature_of_work": "Remote, onsite, or hybrid" } def _parse_llm_response(self, response_text: str) -> Optional[Dict[str, Any]]: json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) if not json_match: json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if not json_match: return None try: return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0)) except json.JSONDecodeError: return None async def save_job_data(self, job_data: Dict[str, Any], keyword: str): await self._save_to_db(job_data) await self._save_to_markdown(job_data, keyword) async def _save_to_db(self, job_data: Dict[str, Any]): db_path = "linkedin_jobs.db" os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) with sqlite3.connect(db_path) as conn: cursor = conn.cursor() fields = list(job_data.keys()) placeholders = ', '.join(['?' for _ in fields]) columns = ', '.join([f'"{col}"' for col in fields]) # Escape column names cursor.execute(f"CREATE TABLE IF NOT EXISTS jobs ({columns})") cursor.execute(f'INSERT INTO jobs ({columns}) VALUES ({placeholders})', [job_data.get(field, 'N/A') for field in fields]) conn.commit() async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): os.makedirs("linkedin_jobs", exist_ok=True) filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md") write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 with open(filepath, "a", encoding="utf-8") as f: if write_header: f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") f.write(f"- **Keyword**: {keyword}\n") for key, value in job_data.items(): if key != 'title': f.write(f"- **{key.replace('_', ' ').title()}**: {value}\n") f.write("\n---\n\n")