from openai import OpenAI from typing import Dict, Any import asyncio import psycopg2 import os from datetime import datetime import json import re from bs4 import BeautifulSoup from dotenv import load_dotenv # Load environment variables from .env load_dotenv() class LLMJobRefiner: def __init__(self): deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") if not deepseek_api_key: raise ValueError("DEEPSEEK_API_KEY not found in .env file.") # Database credentials from .env self.db_username = os.getenv("DB_USERNAME") self.db_password = os.getenv("DB_PASSWORD") self.db_host = os.getenv("DB_HOST") self.db_port = os.getenv("DB_PORT") if not self.db_username or not self.db_password: raise ValueError("Database credentials not found in .env file.") # DeepSeek uses OpenAI-compatible API self.client = OpenAI( api_key=deepseek_api_key, base_url="https://api.deepseek.com/v1" ) self.model = "deepseek-chat" self._init_db() def _init_db(self): """Initialize PostgreSQL database connection and create table""" try: conn = psycopg2.connect( host=self.db_host, port=self.db_port, database="postgres", user=self.db_username, password=self.db_password ) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS crypto_jobs ( id SERIAL PRIMARY KEY, title TEXT, company_name TEXT, location TEXT, description TEXT, requirements TEXT, qualifications TEXT, salary_range TEXT, nature_of_work TEXT, job_id TEXT UNIQUE, url TEXT, category TEXT, scraped_at TIMESTAMP, posted_date TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Ensure the uniqueness constraint exists cursor.execute(''' ALTER TABLE crypto_jobs DROP CONSTRAINT IF EXISTS crypto_jobs_job_id_key; ALTER TABLE crypto_jobs ADD CONSTRAINT crypto_jobs_job_id_key UNIQUE (job_id); ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON crypto_jobs(job_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON crypto_jobs(category)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON crypto_jobs(posted_date)') conn.commit() cursor.close() conn.close() print("✅ PostgreSQL database initialized successfully") except Exception as e: print(f"❌ Database initialization error: {e}") raise def _clean_html_for_llm(self, html_content: str) -> str: """Clean HTML to make it more readable for LLM while preserving structure""" try: soup = BeautifulSoup(html_content, 'html.parser') # Remove script and style elements for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # Extract text but keep some structure text = soup.get_text(separator=' ', strip=True) # Clean up whitespace text = re.sub(r'\s+', ' ', text) # Limit length for LLM context if len(text) > 100000: text = text[:100000] + "..." return text except Exception as e: print(f"HTML cleaning error: {e}") # Fallback to raw content if cleaning fails return html_content[:100000] if len(html_content) > 100000 else html_content def _generate_content_sync(self, prompt: str) -> str: """Synchronous call to DeepSeek API""" try: response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.1, max_tokens=2048, stream=False ) return response.choices[0].message.content or "" except Exception as e: print(f"DeepSeek API error: {e}") return "" async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]: page_content = raw_data.get('page_content', '') cleaned_content = self._clean_html_for_llm(page_content) job_id = raw_data.get('job_id', 'unknown') url = raw_data.get('url', 'N/A') posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y")) prompt = f""" You are an expert job posting data extractor. Your task is to extract AND infer fields from the provided job posting. ### CORE RULES: 1. **NEVER invent, summarize, or paraphrase** — extract **exact wording** when available. 2. **For critical fields (title, company_name, job_id, url, description):** - These MUST be present and meaningful. - If not explicitly stated, **infer from context** (e.g., page title, headings, "About Us", etc.). - **NEVER return "Not provided" or "N/A" for these fields.** 3. **For optional fields (location, salary_range, etc.):** - Extract exact text if present. - If absent but inferable (e.g., "Remote (US)", "Full-time"), **infer it**. - Only return "Not provided" if truly absent and non-inferable. ### FIELD DEFINITIONS: - **title**: The job title. Look in

, page title, or bold headings. - **company_name**: Company name. Look in logo alt text, footer, "About [X]", or page title. - **description**: Main job overview, responsibilities, or duties. Combine relevant paragraphs if needed. **Must not be empty.** - **requirements**: Required skills, experience, or qualifications. - **qualifications**: Educational or certification requirements. - **location**: Office location or remote policy. - **salary_range**: Exact compensation info. - **nature_of_work**: Employment type (Full-time, Contract, Internship, etc.). ### OUTPUT FORMAT: Return ONLY a valid JSON object with these keys: {{ "title": "...", "company_name": "...", "location": "...", "description": "...", "requirements": "...", "qualifications": "...", "salary_range": "...", "nature_of_work": "...", "job_id": "{job_id}", "url": "{url}" }} - **Critical fields must NEVER be "Not provided", "N/A", empty, or generic** (e.g., "Company", "Job Title"). - **Optional fields may be "Not provided" ONLY if truly absent.** - **Do not include markdown, explanations, or extra text.** - **Use double quotes for JSON.** Page Content: {cleaned_content} """ try: response_text = await asyncio.get_event_loop().run_in_executor( None, lambda: self._generate_content_sync(prompt) ) refined_data = self._parse_llm_response(response_text) if not refined_data: return None # Validate critical fields — reject if missing or placeholder critical_fields = ['title', 'company_name', 'job_id', 'url', 'description'] for field in critical_fields: value = refined_data.get(field, "").strip() if not value or value.lower() in ["n/a", "not provided", "unknown", "company", "job", "title", ""]: print(f" ❌ Critical field '{field}' is invalid: '{value}'") return None # This job will NOT be saved — as per requirement # Optional fields: allow "Not provided", but ensure they're strings optional_fields = ['location', 'requirements', 'qualifications', 'salary_range', 'nature_of_work'] for field in optional_fields: if field not in refined_data: refined_data[field] = "Not provided" elif not isinstance(refined_data[field], str): refined_data[field] = str(refined_data[field]) refined_data['posted_date'] = posted_date return refined_data except Exception as e: print(f"LLM refinement failed: {str(e)}") return None def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: # Try to extract JSON from markdown code block json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) if not json_match: # Try to find raw JSON object json_match = re.search(r'\{[^{}]*\{[^{}]*\}[^{}]*\}|\{.*\}', response_text, re.DOTALL) if not json_match: return None try: json_str = json_match.group(1) if '```' in response_text else json_match.group(0) # Clean common issues json_str = re.sub(r'\s+', ' ', json_str) json_str = re.sub(r',\s*([\]}\)])', r'\1', json_str) # Remove trailing commas return json.loads(json_str) except json.JSONDecodeError as e: print(f"JSON parsing error: {e}") return None async def save_job_data(self, job_data: Dict[str, Any], keyword: str): await self._save_to_db(job_data) await self._save_to_markdown(job_data, keyword) async def _save_to_db(self, job_data: Dict[str, Any]): """Save job data to PostgreSQL database with job_id uniqueness""" try: conn = psycopg2.connect( host=self.db_host, port=self.db_port, database="postgres", user=self.db_username, password=self.db_password ) cursor = conn.cursor() cursor.execute(''' INSERT INTO crypto_jobs (title, company_name, location, description, requirements, qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO NOTHING ''', ( job_data.get("title", "Not provided"), job_data.get("company_name", "Not provided"), job_data.get("location", "Not provided"), job_data.get("description", "Not provided"), job_data.get("requirements", "Not provided"), job_data.get("qualifications", "Not provided"), job_data.get("salary_range", "Not provided"), job_data.get("nature_of_work", "Not provided"), job_data.get("job_id", "unknown"), job_data.get("url", "N/A"), job_data.get("category", "all"), job_data.get("scraped_at"), job_data.get("posted_date", datetime.now().strftime("%m/%d/%y")) )) conn.commit() cursor.close() conn.close() print(f" 💾 Saved job to category '{job_data.get('category', 'all')}' with job_id: {job_data.get('job_id', 'unknown')}") except Exception as e: print(f"❌ Database save error: {e}") async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): os.makedirs("crypto_jobs", exist_ok=True) filepath = os.path.join("crypto_jobs", "crypto_jobs_scraped.md") write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 with open(filepath, "a", encoding="utf-8") as f: if write_header: f.write(f"# Crypto Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"## Job: {job_data.get('title', 'Not provided')}\n\n") f.write(f"- **Keyword**: {keyword}\n") f.write(f"- **Company**: {job_data.get('company_name', 'Not provided')}\n") f.write(f"- **Location**: {job_data.get('location', 'Not provided')}\n") f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'Not provided')}\n") f.write(f"- **Salary Range**: {job_data.get('salary_range', 'Not provided')}\n") f.write(f"- **Job ID**: {job_data.get('job_id', 'unknown')}\n") f.write(f"- **Posted Date**: {job_data.get('posted_date', 'N/A')}\n") f.write(f"- **Category**: {job_data.get('category', 'all')}\n") f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n") f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n") f.write(f"### Description\n\n{job_data.get('description', 'Not provided')}\n\n") f.write(f"### Requirements\n\n{job_data.get('requirements', 'Not provided')}\n\n") f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'Not provided')}\n\n") f.write("---\n\n")