Compare commits
2 Commits
37da7b2c1a
...
1005dfc041
| Author | SHA1 | Date | |
|---|---|---|---|
| 1005dfc041 | |||
| d68018224b |
303
llm_agent1.py
Normal file
303
llm_agent1.py
Normal file
@ -0,0 +1,303 @@
|
|||||||
|
|
||||||
|
from openai import OpenAI
|
||||||
|
from typing import Dict, Any
|
||||||
|
import asyncio
|
||||||
|
import psycopg2
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load environment variables from .env
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
class LLMJobRefiner:
|
||||||
|
def __init__(self):
|
||||||
|
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
|
||||||
|
if not deepseek_api_key:
|
||||||
|
raise ValueError("DEEPSEEK_API_KEY not found in .env file.")
|
||||||
|
|
||||||
|
# Database credentials from .env
|
||||||
|
self.db_url = os.getenv("DB_URL")
|
||||||
|
self.db_username = os.getenv("DB_USERNAME")
|
||||||
|
self.db_password = os.getenv("DB_PASSWORD")
|
||||||
|
self.db_host = os.getenv("DB_HOST")
|
||||||
|
self.db_port = os.getenv("DB_PORT")
|
||||||
|
|
||||||
|
if not self.db_url or not self.db_username or not self.db_password:
|
||||||
|
raise ValueError("Database credentials not found in .env file.")
|
||||||
|
|
||||||
|
# DeepSeek uses OpenAI-compatible API
|
||||||
|
self.client = OpenAI(
|
||||||
|
api_key=deepseek_api_key,
|
||||||
|
base_url="https://api.deepseek.com/v1"
|
||||||
|
)
|
||||||
|
self.model = "deepseek-chat"
|
||||||
|
self._init_db()
|
||||||
|
|
||||||
|
def _init_db(self):
|
||||||
|
"""Initialize PostgreSQL database connection and create table"""
|
||||||
|
try:
|
||||||
|
self.db_url = os.getenv("DB_URL")
|
||||||
|
if self.db_url and "supabase.com" in self.db_url:
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
host=self.db_host,
|
||||||
|
port=self.db_port,
|
||||||
|
database="postgres",
|
||||||
|
user=self.db_username,
|
||||||
|
password=self.db_password
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
host=self.db_host,
|
||||||
|
port=self.db_port,
|
||||||
|
database="postgres",
|
||||||
|
user=self.db_username,
|
||||||
|
password=self.db_password
|
||||||
|
)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS crypto_jobs (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
title TEXT,
|
||||||
|
company_name TEXT,
|
||||||
|
location TEXT,
|
||||||
|
description TEXT,
|
||||||
|
requirements TEXT,
|
||||||
|
qualifications TEXT,
|
||||||
|
salary_range TEXT,
|
||||||
|
nature_of_work TEXT,
|
||||||
|
job_id TEXT UNIQUE,
|
||||||
|
url TEXT,
|
||||||
|
category TEXT,
|
||||||
|
scraped_at TIMESTAMP,
|
||||||
|
posted_date TEXT,
|
||||||
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
|
||||||
|
# Ensure the uniqueness constraint exists
|
||||||
|
cursor.execute('''
|
||||||
|
ALTER TABLE crypto_jobs DROP CONSTRAINT IF EXISTS crypto_jobs_job_id_key;
|
||||||
|
ALTER TABLE crypto_jobs ADD CONSTRAINT crypto_jobs_job_id_key UNIQUE (job_id);
|
||||||
|
''')
|
||||||
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON crypto_jobs(job_id)')
|
||||||
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON crypto_jobs(category)')
|
||||||
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON crypto_jobs(posted_date)')
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
cursor.close()
|
||||||
|
conn.close()
|
||||||
|
print("✅ PostgreSQL database initialized successfully")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Database initialization error: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _clean_html_for_llm(self, html_content: str) -> str:
|
||||||
|
"""Clean HTML to make it more readable for LLM while preserving structure"""
|
||||||
|
try:
|
||||||
|
soup = BeautifulSoup(html_content, 'html.parser')
|
||||||
|
|
||||||
|
# Remove script and style elements
|
||||||
|
for script in soup(["script", "style", "nav", "footer", "header"]):
|
||||||
|
script.decompose()
|
||||||
|
|
||||||
|
# Extract text but keep some structure
|
||||||
|
text = soup.get_text(separator=' ', strip=True)
|
||||||
|
|
||||||
|
# Clean up whitespace
|
||||||
|
text = re.sub(r'\s+', ' ', text)
|
||||||
|
|
||||||
|
# Limit length for LLM context
|
||||||
|
if len(text) > 10000:
|
||||||
|
text = text[:10000] + "..."
|
||||||
|
|
||||||
|
return text
|
||||||
|
except Exception as e:
|
||||||
|
print(f"HTML cleaning error: {e}")
|
||||||
|
# Fallback to raw content if cleaning fails
|
||||||
|
return html_content[:100000] if len(html_content) > 100000 else html_content
|
||||||
|
|
||||||
|
def _generate_content_sync(self, prompt: str) -> str:
|
||||||
|
"""Synchronous call to DeepSeek API"""
|
||||||
|
try:
|
||||||
|
response = self.client.chat.completions.create(
|
||||||
|
model=self.model,
|
||||||
|
messages=[{"role": "user", "content": prompt}],
|
||||||
|
temperature=0.2,
|
||||||
|
max_tokens=2048,
|
||||||
|
stream=False
|
||||||
|
)
|
||||||
|
return response.choices[0].message.content or ""
|
||||||
|
except Exception as e:
|
||||||
|
print(f"DeepSeek API error: {e}")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
|
||||||
|
page_content = raw_data.get('page_content', '')
|
||||||
|
cleaned_content = self._clean_html_for_llm(page_content)
|
||||||
|
job_id = raw_data.get('job_id', 'unknown')
|
||||||
|
url = raw_data.get('url', 'N/A')
|
||||||
|
posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y"))
|
||||||
|
|
||||||
|
prompt = f"""
|
||||||
|
You are a job posting data extractor.
|
||||||
|
|
||||||
|
EXTRACT EXACT TEXT - DO NOT SUMMARIZE, PARAPHRASE, OR INVENT.
|
||||||
|
|
||||||
|
For these critical fields, follow these rules:
|
||||||
|
- description: Extract ALL job description text. If ANY job details exist (duties, responsibilities, overview), include them. Only use "Not provided" if absolutely no description exists.
|
||||||
|
- requirements: Extract ALL requirements text. If ANY requirements exist (skills, experience, education needed), include them. Only use "Not provided" if none exist.
|
||||||
|
- qualifications: Extract ALL qualifications text. If ANY qualifications exist, include them. Only use "Not provided" if none exist.
|
||||||
|
|
||||||
|
REQUIRED FIELDS (must have valid values, never "N/A"):
|
||||||
|
- title, company_name, job_id, url
|
||||||
|
|
||||||
|
OPTIONAL FIELDS (can be "Not provided"):
|
||||||
|
- location, salary_range, nature_of_work
|
||||||
|
|
||||||
|
Page Content:
|
||||||
|
{cleaned_content}
|
||||||
|
|
||||||
|
Response format (ONLY return this JSON):
|
||||||
|
{{
|
||||||
|
"title": "...",
|
||||||
|
"company_name": "...",
|
||||||
|
"location": "...",
|
||||||
|
"description": "...",
|
||||||
|
"requirements": "...",
|
||||||
|
"qualifications": "...",
|
||||||
|
"salary_range": "...",
|
||||||
|
"nature_of_work": "...",
|
||||||
|
"job_id": "{job_id}",
|
||||||
|
"url": "{url}"
|
||||||
|
}}
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
response_text = await asyncio.get_event_loop().run_in_executor(
|
||||||
|
None,
|
||||||
|
lambda: self._generate_content_sync(prompt)
|
||||||
|
)
|
||||||
|
refined_data = self._parse_llm_response(response_text)
|
||||||
|
|
||||||
|
if not refined_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Validate required fields
|
||||||
|
required_fields = ['title', 'company_name', 'job_id', 'url']
|
||||||
|
for field in required_fields:
|
||||||
|
if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# CRITICAL: Validate content fields - check if they SHOULD exist
|
||||||
|
content_fields = ['description', 'requirements', 'qualifications']
|
||||||
|
cleaned_original = cleaned_content.lower()
|
||||||
|
|
||||||
|
# Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided"
|
||||||
|
job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master']
|
||||||
|
has_job_content = any(indicator in cleaned_original for indicator in job_indicators)
|
||||||
|
|
||||||
|
if has_job_content:
|
||||||
|
for field in content_fields:
|
||||||
|
value = refined_data.get(field, "").strip()
|
||||||
|
if value in ["Not provided", "N/A", ""]:
|
||||||
|
# LLM failed to extract existing content
|
||||||
|
print(f" ⚠️ LLM returned '{value}' for {field} but job content appears present")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Add the posted_date to the refined data
|
||||||
|
refined_data['posted_date'] = posted_date
|
||||||
|
|
||||||
|
return refined_data
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"LLM refinement failed: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
|
||||||
|
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
|
||||||
|
if not json_match:
|
||||||
|
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
|
||||||
|
if not json_match:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def save_job_data(self, job_data: Dict[str, Any], keyword: str):
|
||||||
|
await self._save_to_db(job_data)
|
||||||
|
await self._save_to_markdown(job_data, keyword)
|
||||||
|
|
||||||
|
async def _save_to_db(self, job_data: Dict[str, Any]):
|
||||||
|
"""Save job data to PostgreSQL database with job_id uniqueness"""
|
||||||
|
try:
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
host=self.db_host,
|
||||||
|
port=self.db_port,
|
||||||
|
database="postgres",
|
||||||
|
user=self.db_username,
|
||||||
|
password=self.db_password
|
||||||
|
)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
cursor.execute('''
|
||||||
|
INSERT INTO crypto_jobs
|
||||||
|
(title, company_name, location, description, requirements,
|
||||||
|
qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (job_id) DO NOTHING
|
||||||
|
''', (
|
||||||
|
job_data.get("title", "N/A"),
|
||||||
|
job_data.get("company_name", "N/A"),
|
||||||
|
job_data.get("location", "N/A"),
|
||||||
|
job_data.get("description", "N/A"),
|
||||||
|
job_data.get("requirements", "N/A"),
|
||||||
|
job_data.get("qualifications", "N/A"),
|
||||||
|
job_data.get("salary_range", "N/A"),
|
||||||
|
job_data.get("nature_of_work", "N/A"),
|
||||||
|
job_data.get("job_id", "N/A"),
|
||||||
|
job_data.get("url", "N/A"),
|
||||||
|
job_data.get("category", "N/A"),
|
||||||
|
job_data.get("scraped_at"),
|
||||||
|
job_data.get("posted_date", "N/A")
|
||||||
|
))
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
cursor.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Database save error: {e}")
|
||||||
|
|
||||||
|
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
|
||||||
|
os.makedirs("linkedin_jobs", exist_ok=True)
|
||||||
|
filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md")
|
||||||
|
write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0
|
||||||
|
|
||||||
|
with open(filepath, "a", encoding="utf-8") as f:
|
||||||
|
if write_header:
|
||||||
|
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
||||||
|
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
|
||||||
|
f.write(f"- **Keyword**: {keyword}\n")
|
||||||
|
f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n")
|
||||||
|
f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n")
|
||||||
|
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
|
||||||
|
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
|
||||||
|
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
|
||||||
|
f.write(f"- **Posted Date**: {job_data.get('posted_date', 'N/A')}\n")
|
||||||
|
f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n")
|
||||||
|
f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n")
|
||||||
|
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
|
||||||
|
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
|
||||||
|
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
|
||||||
|
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
|
||||||
|
f.write("---\n\n")
|
||||||
54
main.py
Normal file
54
main.py
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
from scraping_engine import FingerprintScrapingEngine
|
||||||
|
from scraper import CryptoJobScraper # Updated class name
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
engine = FingerprintScrapingEngine(
|
||||||
|
seed="crypto_scraping_12",
|
||||||
|
target_os="windows",
|
||||||
|
db_path="crypto_jobs.db",
|
||||||
|
markdown_path="crypto_jobs.md"
|
||||||
|
)
|
||||||
|
|
||||||
|
scraper = CryptoJobScraper(engine, human_speed=1.3, user_request="Extract title, company, location, description, requirements, qualifications, nature of work, and salary")
|
||||||
|
|
||||||
|
job_titles = [
|
||||||
|
"Blockchain Engineer",
|
||||||
|
"Smart Contract Developer",
|
||||||
|
"DeFi Analyst",
|
||||||
|
"Web3 Developer",
|
||||||
|
"Crypto Researcher",
|
||||||
|
"Solidity Developer",
|
||||||
|
"Protocol Engineer",
|
||||||
|
"Tokenomics Specialist",
|
||||||
|
"Zero-Knowledge Proof Engineer",
|
||||||
|
"Crypto Compliance Officer"
|
||||||
|
]
|
||||||
|
|
||||||
|
while True:
|
||||||
|
random.shuffle(job_titles)
|
||||||
|
for job_title in job_titles:
|
||||||
|
search_keywords = job_title # No location param needed
|
||||||
|
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"Starting scrape for: {search_keywords}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
await scraper.scrape_jobs(search_keywords=search_keywords)
|
||||||
|
|
||||||
|
print(f"\n✅ Completed scraping for: {job_title}")
|
||||||
|
print(f"⏳ Waiting 90 seconds before next job title...")
|
||||||
|
time.sleep(90)
|
||||||
|
|
||||||
|
print(f"\n✅ Completed full cycle")
|
||||||
|
print(f"🔄 Starting new cycle...")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
253
scraper.py
Normal file
253
scraper.py
Normal file
@ -0,0 +1,253 @@
|
|||||||
|
|
||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
from typing import Optional, Dict
|
||||||
|
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
|
||||||
|
from browserforge.injectors.playwright import AsyncNewContext
|
||||||
|
from llm_agent import LLMJobRefiner
|
||||||
|
import re
|
||||||
|
from fetcher import StealthyFetcher
|
||||||
|
from datetime import datetime
|
||||||
|
import json
|
||||||
|
import redis
|
||||||
|
|
||||||
|
|
||||||
|
class CryptoJobScraper:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
engine,
|
||||||
|
db_path: str = "crypto_jobs.db",
|
||||||
|
human_speed: float = 1.0,
|
||||||
|
user_request: str = "Extract all standard job details"
|
||||||
|
):
|
||||||
|
self.engine = engine
|
||||||
|
self.db_path = db_path
|
||||||
|
self.human_speed = human_speed
|
||||||
|
self.user_request = user_request
|
||||||
|
self.llm_agent = LLMJobRefiner()
|
||||||
|
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
|
||||||
|
|
||||||
|
async def _human_click(self, page, element, wait_after: bool = True):
|
||||||
|
if not element:
|
||||||
|
return False
|
||||||
|
await element.scroll_into_view_if_needed()
|
||||||
|
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
|
||||||
|
try:
|
||||||
|
await element.click()
|
||||||
|
if wait_after:
|
||||||
|
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _extract_page_content_for_llm(self, page) -> str:
|
||||||
|
await asyncio.sleep(2 * self.human_speed)
|
||||||
|
await self.engine._human_like_scroll(page)
|
||||||
|
await asyncio.sleep(2 * self.human_speed)
|
||||||
|
page_content = await page.content()
|
||||||
|
return page_content
|
||||||
|
|
||||||
|
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
|
||||||
|
if not title or not keywords:
|
||||||
|
return 0.0
|
||||||
|
title_lower = title.lower()
|
||||||
|
keyword_list = [kw.strip().lower() for kw in keywords.split()]
|
||||||
|
matches = sum(1 for kw in keyword_list if kw in title_lower)
|
||||||
|
return matches / len(keyword_list) if keyword_list else 0.0
|
||||||
|
|
||||||
|
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
|
||||||
|
current_links = await page.query_selector_all("a[href*='/job/']")
|
||||||
|
new_jobs = 0
|
||||||
|
|
||||||
|
for link in current_links:
|
||||||
|
href = await link.get_attribute("href")
|
||||||
|
if not href or not href.startswith("http"):
|
||||||
|
href = "https://cryptocurrencyjobs.co" + href
|
||||||
|
job_id = href.split("/")[-1] if href.endswith("/") else href.split("/")[-1]
|
||||||
|
|
||||||
|
if job_id and job_id not in seen_job_ids:
|
||||||
|
title_element = await link.query_selector("h3, .job-title")
|
||||||
|
title = (await title_element.inner_text()) if title_element else "Unknown Title"
|
||||||
|
match_percentage = self._calculate_keyword_match(title, search_keywords)
|
||||||
|
|
||||||
|
if match_percentage >= 0.5: # Lower threshold than LinkedIn
|
||||||
|
seen_job_ids.add(job_id)
|
||||||
|
all_job_links.append((href, title))
|
||||||
|
new_jobs += 1
|
||||||
|
else:
|
||||||
|
print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})")
|
||||||
|
return new_jobs
|
||||||
|
|
||||||
|
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
|
||||||
|
current_page = 1
|
||||||
|
while True:
|
||||||
|
print(f"📄 Processing page {current_page}")
|
||||||
|
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
|
||||||
|
print(f" ➕ Found {new_jobs} new job(s) (total: {len(all_job_links)})")
|
||||||
|
|
||||||
|
next_btn = await page.query_selector('a[rel="next"]')
|
||||||
|
if next_btn:
|
||||||
|
next_url = await next_btn.get_attribute("href")
|
||||||
|
if next_url and not next_url.startswith("http"):
|
||||||
|
next_url = "https://cryptocurrencyjobs.co" + next_url
|
||||||
|
await page.goto(next_url, timeout=120000)
|
||||||
|
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
|
||||||
|
current_page += 1
|
||||||
|
else:
|
||||||
|
print("🔚 No 'Next' page — stopping pagination.")
|
||||||
|
break
|
||||||
|
|
||||||
|
async def _extract_job_posted_date(self, page) -> str:
|
||||||
|
try:
|
||||||
|
date_element = await page.query_selector(".job-posted-date, .job-date, time")
|
||||||
|
if date_element:
|
||||||
|
date_text = await date_element.inner_text()
|
||||||
|
if "Today" in date_text:
|
||||||
|
return datetime.now().strftime("%m/%d/%y")
|
||||||
|
elif "Yesterday" in date_text:
|
||||||
|
yesterday = datetime.now().replace(day=datetime.now().day - 1)
|
||||||
|
return yesterday.strftime("%m/%d/%y")
|
||||||
|
else:
|
||||||
|
return datetime.now().strftime("%m/%d/%y")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
return datetime.now().strftime("%m/%d/%y")
|
||||||
|
|
||||||
|
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
|
||||||
|
try:
|
||||||
|
job_data = {
|
||||||
|
"job_url": job_url,
|
||||||
|
"job_id": job_id,
|
||||||
|
"error_type": error_type,
|
||||||
|
"timestamp": datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
self.redis_client.hset("failed_crypto_jobs", job_id, json.dumps(job_data))
|
||||||
|
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
|
||||||
|
|
||||||
|
async def scrape_jobs(
|
||||||
|
self,
|
||||||
|
search_keywords: Optional[str],
|
||||||
|
max_pages: int = 1,
|
||||||
|
credentials: Optional[Dict] = None
|
||||||
|
):
|
||||||
|
# cryptocurrencyjobs.co uses URL params differently
|
||||||
|
encoded_keywords = search_keywords.replace(" ", "%20")
|
||||||
|
search_url = f"https://cryptocurrencyjobs.co/?q={encoded_keywords}"
|
||||||
|
|
||||||
|
profile = self.engine._select_profile()
|
||||||
|
renderer = random.choice(self.engine.common_renderers[self.engine.os])
|
||||||
|
vendor = random.choice(self.engine.common_vendors)
|
||||||
|
spoof_script = self.engine._get_spoof_script(renderer, vendor)
|
||||||
|
|
||||||
|
async with async_playwright() as pw:
|
||||||
|
browser = await pw.chromium.launch(
|
||||||
|
headless=False,
|
||||||
|
args=['--disable-blink-features=AutomationControlled']
|
||||||
|
)
|
||||||
|
context = await AsyncNewContext(browser, fingerprint=profile)
|
||||||
|
|
||||||
|
await context.add_init_script(f"""
|
||||||
|
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
|
||||||
|
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
|
||||||
|
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
|
||||||
|
""")
|
||||||
|
await context.add_init_script(spoof_script)
|
||||||
|
|
||||||
|
page = await context.new_page()
|
||||||
|
|
||||||
|
# Fetch main search page
|
||||||
|
print(f"🔍 Searching for: {search_keywords}")
|
||||||
|
await page.goto(search_url, wait_until='load', timeout=120000)
|
||||||
|
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
|
||||||
|
|
||||||
|
all_job_links = []
|
||||||
|
seen_job_ids = set()
|
||||||
|
|
||||||
|
print("🔄 Collecting job links from search results...")
|
||||||
|
await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
|
||||||
|
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
|
||||||
|
|
||||||
|
print(f"✅ Collected {len(all_job_links)} unique job links.")
|
||||||
|
|
||||||
|
scraped_count = 0
|
||||||
|
for idx, (href, title) in enumerate(all_job_links):
|
||||||
|
try:
|
||||||
|
full_url = href
|
||||||
|
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
|
||||||
|
|
||||||
|
fetcher = StealthyFetcher(self.engine, browser, context)
|
||||||
|
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1")
|
||||||
|
if not job_page:
|
||||||
|
print(f" ❌ Failed to fetch job page {full_url}")
|
||||||
|
await self._add_job_to_redis_cache(full_url, full_url.split("/")[-1], "fetch_failure")
|
||||||
|
self.engine.report_outcome("fetch_failure", url=full_url)
|
||||||
|
continue
|
||||||
|
|
||||||
|
posted_date = await self._extract_job_posted_date(job_page)
|
||||||
|
|
||||||
|
await self.engine._human_like_scroll(job_page)
|
||||||
|
await asyncio.sleep(2 * self.human_speed)
|
||||||
|
page_content = await self._extract_page_content_for_llm(job_page)
|
||||||
|
|
||||||
|
job_id = full_url.split("/")[-1] if full_url.split("/")[-1] else "unknown"
|
||||||
|
|
||||||
|
raw_data = {
|
||||||
|
"page_content": page_content,
|
||||||
|
"url": full_url,
|
||||||
|
"job_id": job_id,
|
||||||
|
"search_keywords": search_keywords,
|
||||||
|
"posted_date": posted_date
|
||||||
|
}
|
||||||
|
|
||||||
|
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
|
||||||
|
|
||||||
|
if refined_data and refined_data.get("title", "N/A") != "N/A":
|
||||||
|
compulsory_fields = ['company_name', 'job_id', 'url']
|
||||||
|
for field in compulsory_fields:
|
||||||
|
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
|
||||||
|
if field == 'job_id':
|
||||||
|
refined_data[field] = job_id
|
||||||
|
elif field == 'url':
|
||||||
|
refined_data[field] = full_url
|
||||||
|
elif field == 'company_name':
|
||||||
|
refined_data[field] = "Unknown Company"
|
||||||
|
|
||||||
|
refined_data['scraped_at'] = datetime.now().isoformat()
|
||||||
|
refined_data['category'] = search_keywords
|
||||||
|
refined_data['posted_date'] = posted_date
|
||||||
|
await self.llm_agent.save_job_data(refined_data, search_keywords)
|
||||||
|
scraped_count += 1
|
||||||
|
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
|
||||||
|
self.engine.report_outcome("success", url=raw_data["url"])
|
||||||
|
else:
|
||||||
|
print(f" 🟡 Could not extract meaningful data from: {full_url}")
|
||||||
|
await self._add_job_to_redis_cache(full_url, job_id, "llm_failure")
|
||||||
|
self.engine.report_outcome("llm_failure", url=raw_data["url"])
|
||||||
|
|
||||||
|
await job_page.close()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = str(e)[:100]
|
||||||
|
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
|
||||||
|
job_id = full_url.split("/")[-1] if 'full_url' in locals() else "unknown"
|
||||||
|
job_url = full_url if 'full_url' in locals() else "unknown"
|
||||||
|
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
|
||||||
|
if 'job_page' in locals() and job_page:
|
||||||
|
await job_page.close()
|
||||||
|
continue
|
||||||
|
|
||||||
|
finally:
|
||||||
|
print(" ↩️ Returning to search results...")
|
||||||
|
await page.goto(search_url, timeout=120000)
|
||||||
|
await asyncio.sleep(4 * self.human_speed)
|
||||||
|
|
||||||
|
await browser.close()
|
||||||
|
|
||||||
|
if scraped_count > 0:
|
||||||
|
self.engine.report_outcome("success")
|
||||||
|
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.")
|
||||||
|
else:
|
||||||
|
self.engine.report_outcome("scraping_error")
|
||||||
|
print("⚠️ No jobs processed successfully.")
|
||||||
Loading…
x
Reference in New Issue
Block a user