Compare commits

..

No commits in common. "1005dfc041efbde8b389f10510473aec691c4a96" and "37da7b2c1a63268fa7c321dcc45c2f9c54f03c91" have entirely different histories.

3 changed files with 0 additions and 610 deletions

View File

@ -1,303 +0,0 @@
from openai import OpenAI
from typing import Dict, Any
import asyncio
import psycopg2
import os
from datetime import datetime
import json
import re
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
class LLMJobRefiner:
def __init__(self):
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if not deepseek_api_key:
raise ValueError("DEEPSEEK_API_KEY not found in .env file.")
# Database credentials from .env
self.db_url = os.getenv("DB_URL")
self.db_username = os.getenv("DB_USERNAME")
self.db_password = os.getenv("DB_PASSWORD")
self.db_host = os.getenv("DB_HOST")
self.db_port = os.getenv("DB_PORT")
if not self.db_url or not self.db_username or not self.db_password:
raise ValueError("Database credentials not found in .env file.")
# DeepSeek uses OpenAI-compatible API
self.client = OpenAI(
api_key=deepseek_api_key,
base_url="https://api.deepseek.com/v1"
)
self.model = "deepseek-chat"
self._init_db()
def _init_db(self):
"""Initialize PostgreSQL database connection and create table"""
try:
self.db_url = os.getenv("DB_URL")
if self.db_url and "supabase.com" in self.db_url:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
else:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS crypto_jobs (
id SERIAL PRIMARY KEY,
title TEXT,
company_name TEXT,
location TEXT,
description TEXT,
requirements TEXT,
qualifications TEXT,
salary_range TEXT,
nature_of_work TEXT,
job_id TEXT UNIQUE,
url TEXT,
category TEXT,
scraped_at TIMESTAMP,
posted_date TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Ensure the uniqueness constraint exists
cursor.execute('''
ALTER TABLE crypto_jobs DROP CONSTRAINT IF EXISTS crypto_jobs_job_id_key;
ALTER TABLE crypto_jobs ADD CONSTRAINT crypto_jobs_job_id_key UNIQUE (job_id);
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON crypto_jobs(job_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON crypto_jobs(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON crypto_jobs(posted_date)')
conn.commit()
cursor.close()
conn.close()
print("✅ PostgreSQL database initialized successfully")
except Exception as e:
print(f"❌ Database initialization error: {e}")
raise
def _clean_html_for_llm(self, html_content: str) -> str:
"""Clean HTML to make it more readable for LLM while preserving structure"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# Extract text but keep some structure
text = soup.get_text(separator=' ', strip=True)
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
# Limit length for LLM context
if len(text) > 10000:
text = text[:10000] + "..."
return text
except Exception as e:
print(f"HTML cleaning error: {e}")
# Fallback to raw content if cleaning fails
return html_content[:100000] if len(html_content) > 100000 else html_content
def _generate_content_sync(self, prompt: str) -> str:
"""Synchronous call to DeepSeek API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=2048,
stream=False
)
return response.choices[0].message.content or ""
except Exception as e:
print(f"DeepSeek API error: {e}")
return ""
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
page_content = raw_data.get('page_content', '')
cleaned_content = self._clean_html_for_llm(page_content)
job_id = raw_data.get('job_id', 'unknown')
url = raw_data.get('url', 'N/A')
posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y"))
prompt = f"""
You are a job posting data extractor.
EXTRACT EXACT TEXT - DO NOT SUMMARIZE, PARAPHRASE, OR INVENT.
For these critical fields, follow these rules:
- description: Extract ALL job description text. If ANY job details exist (duties, responsibilities, overview), include them. Only use "Not provided" if absolutely no description exists.
- requirements: Extract ALL requirements text. If ANY requirements exist (skills, experience, education needed), include them. Only use "Not provided" if none exist.
- qualifications: Extract ALL qualifications text. If ANY qualifications exist, include them. Only use "Not provided" if none exist.
REQUIRED FIELDS (must have valid values, never "N/A"):
- title, company_name, job_id, url
OPTIONAL FIELDS (can be "Not provided"):
- location, salary_range, nature_of_work
Page Content:
{cleaned_content}
Response format (ONLY return this JSON):
{{
"title": "...",
"company_name": "...",
"location": "...",
"description": "...",
"requirements": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{job_id}",
"url": "{url}"
}}
"""
try:
response_text = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._generate_content_sync(prompt)
)
refined_data = self._parse_llm_response(response_text)
if not refined_data:
return None
# Validate required fields
required_fields = ['title', 'company_name', 'job_id', 'url']
for field in required_fields:
if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]:
return None
# CRITICAL: Validate content fields - check if they SHOULD exist
content_fields = ['description', 'requirements', 'qualifications']
cleaned_original = cleaned_content.lower()
# Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided"
job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master']
has_job_content = any(indicator in cleaned_original for indicator in job_indicators)
if has_job_content:
for field in content_fields:
value = refined_data.get(field, "").strip()
if value in ["Not provided", "N/A", ""]:
# LLM failed to extract existing content
print(f" ⚠️ LLM returned '{value}' for {field} but job content appears present")
return None
# Add the posted_date to the refined data
refined_data['posted_date'] = posted_date
return refined_data
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if not json_match:
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if not json_match:
return None
try:
return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0))
except json.JSONDecodeError:
return None
async def save_job_data(self, job_data: Dict[str, Any], keyword: str):
await self._save_to_db(job_data)
await self._save_to_markdown(job_data, keyword)
async def _save_to_db(self, job_data: Dict[str, Any]):
"""Save job data to PostgreSQL database with job_id uniqueness"""
try:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO crypto_jobs
(title, company_name, location, description, requirements,
qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (job_id) DO NOTHING
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
job_data.get("location", "N/A"),
job_data.get("description", "N/A"),
job_data.get("requirements", "N/A"),
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A"),
job_data.get("category", "N/A"),
job_data.get("scraped_at"),
job_data.get("posted_date", "N/A")
))
conn.commit()
cursor.close()
conn.close()
print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}")
except Exception as e:
print(f"❌ Database save error: {e}")
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
os.makedirs("linkedin_jobs", exist_ok=True)
filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md")
write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0
with open(filepath, "a", encoding="utf-8") as f:
if write_header:
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
f.write(f"- **Keyword**: {keyword}\n")
f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n")
f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n")
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- **Posted Date**: {job_data.get('posted_date', 'N/A')}\n")
f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n")
f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n")
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
f.write("---\n\n")

54
main.py
View File

@ -1,54 +0,0 @@
from scraping_engine import FingerprintScrapingEngine
from scraper import CryptoJobScraper # Updated class name
import os
from dotenv import load_dotenv
import asyncio
import random
import time
load_dotenv()
async def main():
engine = FingerprintScrapingEngine(
seed="crypto_scraping_12",
target_os="windows",
db_path="crypto_jobs.db",
markdown_path="crypto_jobs.md"
)
scraper = CryptoJobScraper(engine, human_speed=1.3, user_request="Extract title, company, location, description, requirements, qualifications, nature of work, and salary")
job_titles = [
"Blockchain Engineer",
"Smart Contract Developer",
"DeFi Analyst",
"Web3 Developer",
"Crypto Researcher",
"Solidity Developer",
"Protocol Engineer",
"Tokenomics Specialist",
"Zero-Knowledge Proof Engineer",
"Crypto Compliance Officer"
]
while True:
random.shuffle(job_titles)
for job_title in job_titles:
search_keywords = job_title # No location param needed
print(f"\n{'='*60}")
print(f"Starting scrape for: {search_keywords}")
print(f"{'='*60}")
await scraper.scrape_jobs(search_keywords=search_keywords)
print(f"\n✅ Completed scraping for: {job_title}")
print(f"⏳ Waiting 90 seconds before next job title...")
time.sleep(90)
print(f"\n✅ Completed full cycle")
print(f"🔄 Starting new cycle...")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,253 +0,0 @@
import asyncio
import random
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from fetcher import StealthyFetcher
from datetime import datetime
import json
import redis
class CryptoJobScraper:
def __init__(
self,
engine,
db_path: str = "crypto_jobs.db",
human_speed: float = 1.0,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _extract_page_content_for_llm(self, page) -> str:
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
page_content = await page.content()
return page_content
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
if not title or not keywords:
return 0.0
title_lower = title.lower()
keyword_list = [kw.strip().lower() for kw in keywords.split()]
matches = sum(1 for kw in keyword_list if kw in title_lower)
return matches / len(keyword_list) if keyword_list else 0.0
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_links = await page.query_selector_all("a[href*='/job/']")
new_jobs = 0
for link in current_links:
href = await link.get_attribute("href")
if not href or not href.startswith("http"):
href = "https://cryptocurrencyjobs.co" + href
job_id = href.split("/")[-1] if href.endswith("/") else href.split("/")[-1]
if job_id and job_id not in seen_job_ids:
title_element = await link.query_selector("h3, .job-title")
title = (await title_element.inner_text()) if title_element else "Unknown Title"
match_percentage = self._calculate_keyword_match(title, search_keywords)
if match_percentage >= 0.5: # Lower threshold than LinkedIn
seen_job_ids.add(job_id)
all_job_links.append((href, title))
new_jobs += 1
else:
print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})")
return new_jobs
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_page = 1
while True:
print(f"📄 Processing page {current_page}")
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) (total: {len(all_job_links)})")
next_btn = await page.query_selector('a[rel="next"]')
if next_btn:
next_url = await next_btn.get_attribute("href")
if next_url and not next_url.startswith("http"):
next_url = "https://cryptocurrencyjobs.co" + next_url
await page.goto(next_url, timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_page += 1
else:
print("🔚 No 'Next' page — stopping pagination.")
break
async def _extract_job_posted_date(self, page) -> str:
try:
date_element = await page.query_selector(".job-posted-date, .job-date, time")
if date_element:
date_text = await date_element.inner_text()
if "Today" in date_text:
return datetime.now().strftime("%m/%d/%y")
elif "Yesterday" in date_text:
yesterday = datetime.now().replace(day=datetime.now().day - 1)
return yesterday.strftime("%m/%d/%y")
else:
return datetime.now().strftime("%m/%d/%y")
except:
pass
return datetime.now().strftime("%m/%d/%y")
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
try:
job_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.hset("failed_crypto_jobs", job_id, json.dumps(job_data))
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
except Exception as e:
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
# cryptocurrencyjobs.co uses URL params differently
encoded_keywords = search_keywords.replace(" ", "%20")
search_url = f"https://cryptocurrencyjobs.co/?q={encoded_keywords}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
# Fetch main search page
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
all_job_links = []
seen_job_ids = set()
print("🔄 Collecting job links from search results...")
await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
print(f"✅ Collected {len(all_job_links)} unique job links.")
scraped_count = 0
for idx, (href, title) in enumerate(all_job_links):
try:
full_url = href
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
fetcher = StealthyFetcher(self.engine, browser, context)
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1")
if not job_page:
print(f" ❌ Failed to fetch job page {full_url}")
await self._add_job_to_redis_cache(full_url, full_url.split("/")[-1], "fetch_failure")
self.engine.report_outcome("fetch_failure", url=full_url)
continue
posted_date = await self._extract_job_posted_date(job_page)
await self.engine._human_like_scroll(job_page)
await asyncio.sleep(2 * self.human_speed)
page_content = await self._extract_page_content_for_llm(job_page)
job_id = full_url.split("/")[-1] if full_url.split("/")[-1] else "unknown"
raw_data = {
"page_content": page_content,
"url": full_url,
"job_id": job_id,
"search_keywords": search_keywords,
"posted_date": posted_date
}
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = full_url
elif field == 'company_name':
refined_data[field] = "Unknown Company"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = search_keywords
refined_data['posted_date'] = posted_date
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
self.engine.report_outcome("success", url=raw_data["url"])
else:
print(f" 🟡 Could not extract meaningful data from: {full_url}")
await self._add_job_to_redis_cache(full_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=raw_data["url"])
await job_page.close()
except Exception as e:
error_msg = str(e)[:100]
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
job_id = full_url.split("/")[-1] if 'full_url' in locals() else "unknown"
job_url = full_url if 'full_url' in locals() else "unknown"
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
if 'job_page' in locals() and job_page:
await job_page.close()
continue
finally:
print(" ↩️ Returning to search results...")
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.")
else:
self.engine.report_outcome("scraping_error")
print("⚠️ No jobs processed successfully.")