diff --git a/llm_agent1.py b/llm_agent1.py new file mode 100644 index 0000000..2d72299 --- /dev/null +++ b/llm_agent1.py @@ -0,0 +1,303 @@ + +from openai import OpenAI +from typing import Dict, Any +import asyncio +import psycopg2 +import os +from datetime import datetime +import json +import re +from bs4 import BeautifulSoup +from dotenv import load_dotenv + +# Load environment variables from .env +load_dotenv() + + +class LLMJobRefiner: + def __init__(self): + deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") + if not deepseek_api_key: + raise ValueError("DEEPSEEK_API_KEY not found in .env file.") + + # Database credentials from .env + self.db_url = os.getenv("DB_URL") + self.db_username = os.getenv("DB_USERNAME") + self.db_password = os.getenv("DB_PASSWORD") + self.db_host = os.getenv("DB_HOST") + self.db_port = os.getenv("DB_PORT") + + if not self.db_url or not self.db_username or not self.db_password: + raise ValueError("Database credentials not found in .env file.") + + # DeepSeek uses OpenAI-compatible API + self.client = OpenAI( + api_key=deepseek_api_key, + base_url="https://api.deepseek.com/v1" + ) + self.model = "deepseek-chat" + self._init_db() + + def _init_db(self): + """Initialize PostgreSQL database connection and create table""" + try: + self.db_url = os.getenv("DB_URL") + if self.db_url and "supabase.com" in self.db_url: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + else: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + cursor = conn.cursor() + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS crypto_jobs ( + id SERIAL PRIMARY KEY, + title TEXT, + company_name TEXT, + location TEXT, + description TEXT, + requirements TEXT, + qualifications TEXT, + salary_range TEXT, + nature_of_work TEXT, + job_id TEXT UNIQUE, + url TEXT, + category TEXT, + scraped_at TIMESTAMP, + posted_date TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) +''') + + # Ensure the uniqueness constraint exists + cursor.execute(''' + ALTER TABLE crypto_jobs DROP CONSTRAINT IF EXISTS crypto_jobs_job_id_key; + ALTER TABLE crypto_jobs ADD CONSTRAINT crypto_jobs_job_id_key UNIQUE (job_id); +''') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON crypto_jobs(job_id)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON crypto_jobs(category)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON crypto_jobs(posted_date)') + + conn.commit() + cursor.close() + conn.close() + print("āœ… PostgreSQL database initialized successfully") + except Exception as e: + print(f"āŒ Database initialization error: {e}") + raise + + def _clean_html_for_llm(self, html_content: str) -> str: + """Clean HTML to make it more readable for LLM while preserving structure""" + try: + soup = BeautifulSoup(html_content, 'html.parser') + + # Remove script and style elements + for script in soup(["script", "style", "nav", "footer", "header"]): + script.decompose() + + # Extract text but keep some structure + text = soup.get_text(separator=' ', strip=True) + + # Clean up whitespace + text = re.sub(r'\s+', ' ', text) + + # Limit length for LLM context + if len(text) > 10000: + text = text[:10000] + "..." + + return text + except Exception as e: + print(f"HTML cleaning error: {e}") + # Fallback to raw content if cleaning fails + return html_content[:100000] if len(html_content) > 100000 else html_content + + def _generate_content_sync(self, prompt: str) -> str: + """Synchronous call to DeepSeek API""" + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + temperature=0.2, + max_tokens=2048, + stream=False + ) + return response.choices[0].message.content or "" + except Exception as e: + print(f"DeepSeek API error: {e}") + return "" + + async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]: + page_content = raw_data.get('page_content', '') + cleaned_content = self._clean_html_for_llm(page_content) + job_id = raw_data.get('job_id', 'unknown') + url = raw_data.get('url', 'N/A') + posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y")) + + prompt = f""" + You are a job posting data extractor. + + EXTRACT EXACT TEXT - DO NOT SUMMARIZE, PARAPHRASE, OR INVENT. + + For these critical fields, follow these rules: + - description: Extract ALL job description text. If ANY job details exist (duties, responsibilities, overview), include them. Only use "Not provided" if absolutely no description exists. + - requirements: Extract ALL requirements text. If ANY requirements exist (skills, experience, education needed), include them. Only use "Not provided" if none exist. + - qualifications: Extract ALL qualifications text. If ANY qualifications exist, include them. Only use "Not provided" if none exist. + + REQUIRED FIELDS (must have valid values, never "N/A"): + - title, company_name, job_id, url + + OPTIONAL FIELDS (can be "Not provided"): + - location, salary_range, nature_of_work + + Page Content: + {cleaned_content} + + Response format (ONLY return this JSON): + {{ + "title": "...", + "company_name": "...", + "location": "...", + "description": "...", + "requirements": "...", + "qualifications": "...", + "salary_range": "...", + "nature_of_work": "...", + "job_id": "{job_id}", + "url": "{url}" + }} + """ + + try: + response_text = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self._generate_content_sync(prompt) + ) + refined_data = self._parse_llm_response(response_text) + + if not refined_data: + return None + + # Validate required fields + required_fields = ['title', 'company_name', 'job_id', 'url'] + for field in required_fields: + if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]: + return None + + # CRITICAL: Validate content fields - check if they SHOULD exist + content_fields = ['description', 'requirements', 'qualifications'] + cleaned_original = cleaned_content.lower() + + # Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided" + job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master'] + has_job_content = any(indicator in cleaned_original for indicator in job_indicators) + + if has_job_content: + for field in content_fields: + value = refined_data.get(field, "").strip() + if value in ["Not provided", "N/A", ""]: + # LLM failed to extract existing content + print(f" āš ļø LLM returned '{value}' for {field} but job content appears present") + return None + + # Add the posted_date to the refined data + refined_data['posted_date'] = posted_date + + return refined_data + + except Exception as e: + print(f"LLM refinement failed: {str(e)}") + return None + + def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: + json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) + if not json_match: + json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + if not json_match: + return None + + try: + return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0)) + except json.JSONDecodeError: + return None + + async def save_job_data(self, job_data: Dict[str, Any], keyword: str): + await self._save_to_db(job_data) + await self._save_to_markdown(job_data, keyword) + + async def _save_to_db(self, job_data: Dict[str, Any]): + """Save job data to PostgreSQL database with job_id uniqueness""" + try: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + cursor = conn.cursor() + + cursor.execute(''' + INSERT INTO crypto_jobs + (title, company_name, location, description, requirements, + qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (job_id) DO NOTHING +''', ( + job_data.get("title", "N/A"), + job_data.get("company_name", "N/A"), + job_data.get("location", "N/A"), + job_data.get("description", "N/A"), + job_data.get("requirements", "N/A"), + job_data.get("qualifications", "N/A"), + job_data.get("salary_range", "N/A"), + job_data.get("nature_of_work", "N/A"), + job_data.get("job_id", "N/A"), + job_data.get("url", "N/A"), + job_data.get("category", "N/A"), + job_data.get("scraped_at"), + job_data.get("posted_date", "N/A") + )) + + conn.commit() + cursor.close() + conn.close() + + print(f" šŸ’¾ Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}") + + except Exception as e: + print(f"āŒ Database save error: {e}") + + async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): + os.makedirs("linkedin_jobs", exist_ok=True) + filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md") + write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 + + with open(filepath, "a", encoding="utf-8") as f: + if write_header: + f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") + f.write(f"- **Keyword**: {keyword}\n") + f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n") + f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n") + f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n") + f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n") + f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n") + f.write(f"- **Posted Date**: {job_data.get('posted_date', 'N/A')}\n") + f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n") + f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n") + f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n") + f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") + f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") + f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n") + f.write("---\n\n") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..bb7c637 --- /dev/null +++ b/main.py @@ -0,0 +1,54 @@ +from scraping_engine import FingerprintScrapingEngine +from scraper import CryptoJobScraper # Updated class name +import os +from dotenv import load_dotenv +import asyncio +import random +import time + +load_dotenv() + + +async def main(): + engine = FingerprintScrapingEngine( + seed="crypto_scraping_12", + target_os="windows", + db_path="crypto_jobs.db", + markdown_path="crypto_jobs.md" + ) + + scraper = CryptoJobScraper(engine, human_speed=1.3, user_request="Extract title, company, location, description, requirements, qualifications, nature of work, and salary") + + job_titles = [ + "Blockchain Engineer", + "Smart Contract Developer", + "DeFi Analyst", + "Web3 Developer", + "Crypto Researcher", + "Solidity Developer", + "Protocol Engineer", + "Tokenomics Specialist", + "Zero-Knowledge Proof Engineer", + "Crypto Compliance Officer" + ] + + while True: + random.shuffle(job_titles) + for job_title in job_titles: + search_keywords = job_title # No location param needed + + print(f"\n{'='*60}") + print(f"Starting scrape for: {search_keywords}") + print(f"{'='*60}") + + await scraper.scrape_jobs(search_keywords=search_keywords) + + print(f"\nāœ… Completed scraping for: {job_title}") + print(f"ā³ Waiting 90 seconds before next job title...") + time.sleep(90) + + print(f"\nāœ… Completed full cycle") + print(f"šŸ”„ Starting new cycle...") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/scraper.py b/scraper.py new file mode 100644 index 0000000..10bad5a --- /dev/null +++ b/scraper.py @@ -0,0 +1,253 @@ + +import asyncio +import random +from typing import Optional, Dict +from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError +from browserforge.injectors.playwright import AsyncNewContext +from llm_agent import LLMJobRefiner +import re +from fetcher import StealthyFetcher +from datetime import datetime +import json +import redis + + +class CryptoJobScraper: + def __init__( + self, + engine, + db_path: str = "crypto_jobs.db", + human_speed: float = 1.0, + user_request: str = "Extract all standard job details" + ): + self.engine = engine + self.db_path = db_path + self.human_speed = human_speed + self.user_request = user_request + self.llm_agent = LLMJobRefiner() + self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) + + async def _human_click(self, page, element, wait_after: bool = True): + if not element: + return False + await element.scroll_into_view_if_needed() + await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed) + try: + await element.click() + if wait_after: + await asyncio.sleep(random.uniform(2, 4) * self.human_speed) + return True + except: + return False + + async def _extract_page_content_for_llm(self, page) -> str: + await asyncio.sleep(2 * self.human_speed) + await self.engine._human_like_scroll(page) + await asyncio.sleep(2 * self.human_speed) + page_content = await page.content() + return page_content + + def _calculate_keyword_match(self, title: str, keywords: str) -> float: + if not title or not keywords: + return 0.0 + title_lower = title.lower() + keyword_list = [kw.strip().lower() for kw in keywords.split()] + matches = sum(1 for kw in keyword_list if kw in title_lower) + return matches / len(keyword_list) if keyword_list else 0.0 + + async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): + current_links = await page.query_selector_all("a[href*='/job/']") + new_jobs = 0 + + for link in current_links: + href = await link.get_attribute("href") + if not href or not href.startswith("http"): + href = "https://cryptocurrencyjobs.co" + href + job_id = href.split("/")[-1] if href.endswith("/") else href.split("/")[-1] + + if job_id and job_id not in seen_job_ids: + title_element = await link.query_selector("h3, .job-title") + title = (await title_element.inner_text()) if title_element else "Unknown Title" + match_percentage = self._calculate_keyword_match(title, search_keywords) + + if match_percentage >= 0.5: # Lower threshold than LinkedIn + seen_job_ids.add(job_id) + all_job_links.append((href, title)) + new_jobs += 1 + else: + print(f" āš ļø Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})") + return new_jobs + + async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): + current_page = 1 + while True: + print(f"šŸ“„ Processing page {current_page}") + new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + print(f" āž• Found {new_jobs} new job(s) (total: {len(all_job_links)})") + + next_btn = await page.query_selector('a[rel="next"]') + if next_btn: + next_url = await next_btn.get_attribute("href") + if next_url and not next_url.startswith("http"): + next_url = "https://cryptocurrencyjobs.co" + next_url + await page.goto(next_url, timeout=120000) + await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) + current_page += 1 + else: + print("šŸ”š No 'Next' page — stopping pagination.") + break + + async def _extract_job_posted_date(self, page) -> str: + try: + date_element = await page.query_selector(".job-posted-date, .job-date, time") + if date_element: + date_text = await date_element.inner_text() + if "Today" in date_text: + return datetime.now().strftime("%m/%d/%y") + elif "Yesterday" in date_text: + yesterday = datetime.now().replace(day=datetime.now().day - 1) + return yesterday.strftime("%m/%d/%y") + else: + return datetime.now().strftime("%m/%d/%y") + except: + pass + return datetime.now().strftime("%m/%d/%y") + + async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): + try: + job_data = { + "job_url": job_url, + "job_id": job_id, + "error_type": error_type, + "timestamp": datetime.now().isoformat() + } + self.redis_client.hset("failed_crypto_jobs", job_id, json.dumps(job_data)) + print(f" šŸ“¦ Added failed job to Redis cache: {job_id} (Error: {error_type})") + except Exception as e: + print(f" āŒ Failed to add job to Redis cache: {str(e)}") + + async def scrape_jobs( + self, + search_keywords: Optional[str], + max_pages: int = 1, + credentials: Optional[Dict] = None + ): + # cryptocurrencyjobs.co uses URL params differently + encoded_keywords = search_keywords.replace(" ", "%20") + search_url = f"https://cryptocurrencyjobs.co/?q={encoded_keywords}" + + profile = self.engine._select_profile() + renderer = random.choice(self.engine.common_renderers[self.engine.os]) + vendor = random.choice(self.engine.common_vendors) + spoof_script = self.engine._get_spoof_script(renderer, vendor) + + async with async_playwright() as pw: + browser = await pw.chromium.launch( + headless=False, + args=['--disable-blink-features=AutomationControlled'] + ) + context = await AsyncNewContext(browser, fingerprint=profile) + + await context.add_init_script(f""" + Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); + Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); + Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); + """) + await context.add_init_script(spoof_script) + + page = await context.new_page() + + # Fetch main search page + print(f"šŸ” Searching for: {search_keywords}") + await page.goto(search_url, wait_until='load', timeout=120000) + await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) + + all_job_links = [] + seen_job_ids = set() + + print("šŸ”„ Collecting job links from search results...") + await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) + + print(f"āœ… Collected {len(all_job_links)} unique job links.") + + scraped_count = 0 + for idx, (href, title) in enumerate(all_job_links): + try: + full_url = href + print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") + + fetcher = StealthyFetcher(self.engine, browser, context) + job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1") + if not job_page: + print(f" āŒ Failed to fetch job page {full_url}") + await self._add_job_to_redis_cache(full_url, full_url.split("/")[-1], "fetch_failure") + self.engine.report_outcome("fetch_failure", url=full_url) + continue + + posted_date = await self._extract_job_posted_date(job_page) + + await self.engine._human_like_scroll(job_page) + await asyncio.sleep(2 * self.human_speed) + page_content = await self._extract_page_content_for_llm(job_page) + + job_id = full_url.split("/")[-1] if full_url.split("/")[-1] else "unknown" + + raw_data = { + "page_content": page_content, + "url": full_url, + "job_id": job_id, + "search_keywords": search_keywords, + "posted_date": posted_date + } + + refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) + + if refined_data and refined_data.get("title", "N/A") != "N/A": + compulsory_fields = ['company_name', 'job_id', 'url'] + for field in compulsory_fields: + if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: + if field == 'job_id': + refined_data[field] = job_id + elif field == 'url': + refined_data[field] = full_url + elif field == 'company_name': + refined_data[field] = "Unknown Company" + + refined_data['scraped_at'] = datetime.now().isoformat() + refined_data['category'] = search_keywords + refined_data['posted_date'] = posted_date + await self.llm_agent.save_job_data(refined_data, search_keywords) + scraped_count += 1 + print(f" āœ… Scraped and refined: {refined_data['title'][:50]}...") + self.engine.report_outcome("success", url=raw_data["url"]) + else: + print(f" 🟔 Could not extract meaningful data from: {full_url}") + await self._add_job_to_redis_cache(full_url, job_id, "llm_failure") + self.engine.report_outcome("llm_failure", url=raw_data["url"]) + + await job_page.close() + + except Exception as e: + error_msg = str(e)[:100] + print(f" āš ļø Failed on job {idx+1}: {error_msg}") + job_id = full_url.split("/")[-1] if 'full_url' in locals() else "unknown" + job_url = full_url if 'full_url' in locals() else "unknown" + await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}") + if 'job_page' in locals() and job_page: + await job_page.close() + continue + + finally: + print(" ā†©ļø Returning to search results...") + await page.goto(search_url, timeout=120000) + await asyncio.sleep(4 * self.human_speed) + + await browser.close() + + if scraped_count > 0: + self.engine.report_outcome("success") + print(f"āœ… Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.") + else: + self.engine.report_outcome("scraping_error") + print("āš ļø No jobs processed successfully.")