Add AshbyJobScraper and Sender classes for job scraping and message sending; implement Redis caching and RabbitMQ integration.
This commit is contained in:
parent
2d22fbdb92
commit
762846cb4a
339
scraper.py
Normal file
339
scraper.py
Normal file
@ -0,0 +1,339 @@
|
|||||||
|
|
||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from typing import Optional, Dict
|
||||||
|
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
|
||||||
|
from browserforge.injectors.playwright import AsyncNewContext
|
||||||
|
from llm_agent import LLMJobRefiner
|
||||||
|
from fetcher import StealthyFetcher
|
||||||
|
from datetime import datetime
|
||||||
|
import redis
|
||||||
|
import pika
|
||||||
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Import your engine
|
||||||
|
from scraping_engine import FingerprintScrapingEngine
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Environment variables
|
||||||
|
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "rabbitq.thejobhub.xyz")
|
||||||
|
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5672"))
|
||||||
|
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
|
||||||
|
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")
|
||||||
|
REDIS_HOST = os.getenv("REDIS_HOST", "redis-scrape.thejobhub.xyz")
|
||||||
|
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
|
||||||
|
|
||||||
|
class AshbyJobScraper:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
engine: FingerprintScrapingEngine,
|
||||||
|
user_request: str = "Extract all standard job details"
|
||||||
|
):
|
||||||
|
self.engine = engine
|
||||||
|
self.user_request = user_request
|
||||||
|
self.llm_agent = LLMJobRefiner()
|
||||||
|
self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
|
||||||
|
self.browser = None
|
||||||
|
self.context = None
|
||||||
|
|
||||||
|
async def init_browser(self):
|
||||||
|
"""Initialize browser once using engine's fingerprint"""
|
||||||
|
if self.browser is None:
|
||||||
|
profile = self.engine._select_profile()
|
||||||
|
renderer = random.choice(self.engine.common_renderers[self.engine.os])
|
||||||
|
vendor = random.choice(self.engine.common_vendors)
|
||||||
|
spoof_script = self.engine._get_spoof_script(renderer, vendor)
|
||||||
|
|
||||||
|
pw = await async_playwright().start()
|
||||||
|
self.browser = await pw.chromium.launch(
|
||||||
|
headless=True,
|
||||||
|
args=['--disable-blink-features=AutomationControlled']
|
||||||
|
)
|
||||||
|
self.context = await AsyncNewContext(self.browser, fingerprint=profile)
|
||||||
|
await self.context.add_init_script(f"""
|
||||||
|
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
|
||||||
|
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
|
||||||
|
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
|
||||||
|
""")
|
||||||
|
await self.context.add_init_script(spoof_script)
|
||||||
|
|
||||||
|
async def close_browser(self):
|
||||||
|
if self.browser:
|
||||||
|
await self.browser.close()
|
||||||
|
self.browser = None
|
||||||
|
|
||||||
|
async def _safe_inner_text(self, element):
|
||||||
|
if not element:
|
||||||
|
return "Unknown"
|
||||||
|
try:
|
||||||
|
return await element.text_content()
|
||||||
|
except:
|
||||||
|
return "Unknown"
|
||||||
|
|
||||||
|
async def _human_click(self, page, element, wait_after: bool = True):
|
||||||
|
if not element:
|
||||||
|
return False
|
||||||
|
await element.scroll_into_view_if_needed()
|
||||||
|
speed = self.engine.optimization_params.get("base_delay", 2.0) / 2
|
||||||
|
await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2))
|
||||||
|
try:
|
||||||
|
await element.click()
|
||||||
|
if wait_after:
|
||||||
|
await asyncio.sleep(random.uniform(2, 4) * (speed / 2))
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _extract_page_content_for_llm(self, page) -> str:
|
||||||
|
speed = self.engine.optimization_params.get("base_delay", 2.0)
|
||||||
|
await asyncio.sleep(2 * (speed / 2))
|
||||||
|
await self.engine._human_like_scroll(page)
|
||||||
|
await asyncio.sleep(2 * (speed / 2))
|
||||||
|
return await page.content()
|
||||||
|
|
||||||
|
async def _is_job_seen(self, job_id: str) -> bool:
|
||||||
|
return self.redis_client.get(f"seen_job:{job_id}") is not None
|
||||||
|
|
||||||
|
async def _mark_job_seen(self, job_id: str):
|
||||||
|
self.redis_client.setex(f"seen_job:{job_id}", 7 * 24 * 3600, "1")
|
||||||
|
|
||||||
|
async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
|
||||||
|
cached = self.redis_client.get(f"llm_cache:{job_url}")
|
||||||
|
if cached:
|
||||||
|
return json.loads(cached)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _cache_llm_result(self, job_url: str, result: Dict):
|
||||||
|
self.redis_client.setex(f"llm_cache:{job_url}", 7 * 24 * 3600, json.dumps(result))
|
||||||
|
|
||||||
|
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
|
||||||
|
try:
|
||||||
|
job_data = {
|
||||||
|
"job_url": job_url,
|
||||||
|
"job_id": job_id,
|
||||||
|
"error_type": error_type,
|
||||||
|
"timestamp": datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data))
|
||||||
|
logger.info(f"📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Failed to add to Redis: {str(e)}")
|
||||||
|
|
||||||
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
|
||||||
|
async def scrape_job(
|
||||||
|
self,
|
||||||
|
job_url: str,
|
||||||
|
company_name: str,
|
||||||
|
message_id: str
|
||||||
|
):
|
||||||
|
job_id = job_url.strip("/").split("/")[-1]
|
||||||
|
|
||||||
|
if await self._is_job_seen(job_id):
|
||||||
|
logger.info(f"⏭️ Skipping already processed job: {job_id}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
cached_result = await self._get_cached_llm_result(job_url)
|
||||||
|
if cached_result:
|
||||||
|
logger.info(f"📦 Using cached LLM result for: {job_url}")
|
||||||
|
await self.llm_agent.save_job_data(cached_result, company_name)
|
||||||
|
await self._mark_job_seen(job_id)
|
||||||
|
return True
|
||||||
|
|
||||||
|
page = None
|
||||||
|
start_time = time.time()
|
||||||
|
try:
|
||||||
|
await self.init_browser()
|
||||||
|
page = await self.context.new_page()
|
||||||
|
|
||||||
|
# Fetch with timeout from engine config
|
||||||
|
timeout_ms = self.engine.optimization_params.get("request_timeout", 120000)
|
||||||
|
temp_fetcher = StealthyFetcher(self.engine, self.browser, self.context)
|
||||||
|
job_page = await asyncio.wait_for(
|
||||||
|
temp_fetcher.fetch_url(job_url, wait_for_selector="h1"),
|
||||||
|
timeout=timeout_ms / 1000.0
|
||||||
|
)
|
||||||
|
|
||||||
|
if not job_page:
|
||||||
|
await self._add_job_to_redis_cache(job_url, job_id, "fetch_failure")
|
||||||
|
self.engine.report_outcome("fetch_failure", url=job_url)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Handle Cloudflare if detected
|
||||||
|
if await self.engine._detect_cloudflare(job_page):
|
||||||
|
success = await self.engine._handle_cloudflare(job_page)
|
||||||
|
if not success:
|
||||||
|
await self._add_job_to_redis_cache(job_url, job_id, "cloudflare")
|
||||||
|
self.engine.report_outcome("cloudflare", url=job_url)
|
||||||
|
return False
|
||||||
|
|
||||||
|
apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')")
|
||||||
|
apply_type = 'signup'
|
||||||
|
if apply_btn:
|
||||||
|
await self._human_click(job_page, apply_btn)
|
||||||
|
speed = self.engine.optimization_params.get("base_delay", 2.0)
|
||||||
|
await asyncio.sleep(2 * (speed / 2))
|
||||||
|
form = await job_page.query_selector("form, div[class*='application-form']")
|
||||||
|
if form:
|
||||||
|
apply_type = 'AI'
|
||||||
|
|
||||||
|
final_url = job_url
|
||||||
|
page_content = await self._extract_page_content_for_llm(job_page)
|
||||||
|
posted_date = datetime.now().strftime("%m/%d/%y")
|
||||||
|
|
||||||
|
raw_data = {
|
||||||
|
"page_content": page_content,
|
||||||
|
"url": final_url,
|
||||||
|
"job_id": job_id,
|
||||||
|
"search_keywords": company_name,
|
||||||
|
"posted_date": posted_date
|
||||||
|
}
|
||||||
|
|
||||||
|
# LLM call with timeout
|
||||||
|
llm_timeout = max(30, self.engine.feedback.get("avg_response_time", 10) * 2)
|
||||||
|
refined_data = await asyncio.wait_for(
|
||||||
|
self.llm_agent.refine_job_data(raw_data, self.user_request),
|
||||||
|
timeout=llm_timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
success = False
|
||||||
|
if refined_data and refined_data.get("title", "N/A") != "N/A":
|
||||||
|
compulsory_fields = ['company_name', 'job_id', 'url']
|
||||||
|
for field in compulsory_fields:
|
||||||
|
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
|
||||||
|
if field == 'job_id':
|
||||||
|
refined_data[field] = job_id
|
||||||
|
elif field == 'url':
|
||||||
|
refined_data[field] = final_url
|
||||||
|
elif field == 'company_name':
|
||||||
|
refined_data[field] = company_name
|
||||||
|
|
||||||
|
refined_data['apply_type'] = apply_type
|
||||||
|
refined_data['scraped_at'] = datetime.now().isoformat()
|
||||||
|
refined_data['category'] = company_name
|
||||||
|
refined_data['posted_date'] = posted_date
|
||||||
|
refined_data['message_id'] = message_id
|
||||||
|
|
||||||
|
await self.llm_agent.save_job_data(refined_data, company_name)
|
||||||
|
await self._cache_llm_result(job_url, refined_data)
|
||||||
|
await self._mark_job_seen(job_id)
|
||||||
|
|
||||||
|
response_time = time.time() - start_time
|
||||||
|
self.engine.report_outcome("success", url=final_url, response_time=response_time)
|
||||||
|
logger.info(f"✅ Scraped: {refined_data['title'][:50]}...")
|
||||||
|
success = True
|
||||||
|
else:
|
||||||
|
logger.warning(f"🟡 LLM failed to refine: {final_url}")
|
||||||
|
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
|
||||||
|
self.engine.report_outcome("llm_failure", url=final_url)
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.error(f"⏰ Timeout processing job: {job_url}")
|
||||||
|
await self._add_job_to_redis_cache(job_url, job_id, "timeout")
|
||||||
|
self.engine.report_outcome("timeout", url=job_url)
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"💥 Error processing job {job_url}: {str(e)}")
|
||||||
|
await self._add_job_to_redis_cache(job_url, job_id, "exception")
|
||||||
|
self.engine.report_outcome("exception", url=job_url)
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
if page:
|
||||||
|
await page.close()
|
||||||
|
|
||||||
|
# Global metrics
|
||||||
|
METRICS = {
|
||||||
|
"processed": 0,
|
||||||
|
"success": 0,
|
||||||
|
"failed": 0,
|
||||||
|
"skipped": 0,
|
||||||
|
"start_time": time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
async def process_message_async(scraper: AshbyJobScraper, ch, method, properties, body):
|
||||||
|
try:
|
||||||
|
job_data = json.loads(body)
|
||||||
|
job_link = job_data['job_link']
|
||||||
|
company_name = job_data['company_name']
|
||||||
|
message_id = properties.message_id or f"msg_{int(time.time()*1000)}"
|
||||||
|
|
||||||
|
logger.info(f"📥 Processing job: {job_link} (ID: {message_id})")
|
||||||
|
|
||||||
|
success = await scraper.scrape_job(job_link, company_name, message_id)
|
||||||
|
|
||||||
|
METRICS["processed"] += 1
|
||||||
|
if success:
|
||||||
|
METRICS["success"] += 1
|
||||||
|
else:
|
||||||
|
METRICS["failed"] += 1
|
||||||
|
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.error("❌ Invalid JSON in message")
|
||||||
|
METRICS["failed"] += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"💥 Unexpected error: {str(e)}")
|
||||||
|
METRICS["failed"] += 1
|
||||||
|
finally:
|
||||||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
def callback_wrapper(scraper: AshbyJobScraper):
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
asyncio.run(process_message_async(scraper, ch, method, properties, body))
|
||||||
|
return callback
|
||||||
|
|
||||||
|
def start_consumer():
|
||||||
|
# Initialize REAL engine
|
||||||
|
engine = FingerprintScrapingEngine(
|
||||||
|
seed="ashby_scraper",
|
||||||
|
target_os="windows",
|
||||||
|
num_variations=10
|
||||||
|
)
|
||||||
|
scraper = AshbyJobScraper(engine)
|
||||||
|
|
||||||
|
# RabbitMQ connection with retries
|
||||||
|
connection = None
|
||||||
|
for attempt in range(5):
|
||||||
|
try:
|
||||||
|
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
||||||
|
parameters = pika.ConnectionParameters(
|
||||||
|
host=RABBITMQ_HOST,
|
||||||
|
port=RABBITMQ_PORT,
|
||||||
|
virtual_host='/',
|
||||||
|
credentials=credentials,
|
||||||
|
heartbeat=600,
|
||||||
|
blocked_connection_timeout=300
|
||||||
|
)
|
||||||
|
connection = pika.BlockingConnection(parameters)
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}")
|
||||||
|
time.sleep(2 ** attempt)
|
||||||
|
|
||||||
|
if not connection:
|
||||||
|
logger.error("Failed to connect to RabbitMQ after retries")
|
||||||
|
return
|
||||||
|
|
||||||
|
channel = connection.channel()
|
||||||
|
channel.queue_declare(queue='job_queue', durable=True)
|
||||||
|
channel.basic_qos(prefetch_count=1)
|
||||||
|
channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper))
|
||||||
|
|
||||||
|
logger.info('Waiting for messages. To exit press CTRL+C')
|
||||||
|
try:
|
||||||
|
channel.start_consuming()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Shutting down...")
|
||||||
|
channel.stop_consuming()
|
||||||
|
connection.close()
|
||||||
|
asyncio.run(scraper.close_browser())
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
start_consumer()
|
||||||
181
sender.py
Normal file
181
sender.py
Normal file
@ -0,0 +1,181 @@
|
|||||||
|
import csv
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import signal
|
||||||
|
import uuid
|
||||||
|
from configparser import ConfigParser
|
||||||
|
import pika
|
||||||
|
import redis
|
||||||
|
import os
|
||||||
|
|
||||||
|
class Sender:
|
||||||
|
def __init__(self, config_file='config.ini'):
|
||||||
|
self.config = ConfigParser()
|
||||||
|
self.config.read(config_file)
|
||||||
|
|
||||||
|
# RabbitMQ from env vars with fallbacks
|
||||||
|
self.rabbitmq_host = os.getenv("RABBITMQ_HOST", self.config.get('rabbitmq', 'url', fallback='rabbitq.thejobhub.xyz'))
|
||||||
|
self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT", self.config.get('rabbitmq', 'port', fallback='5672')))
|
||||||
|
self.username = os.getenv("RABBITMQ_USER", self.config.get('rabbitmq', 'username', fallback='guest'))
|
||||||
|
self.password = os.getenv("RABBITMQ_PASS", self.config.get('rabbitmq', 'password', fallback='guest'))
|
||||||
|
self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue')
|
||||||
|
self.directory = self.config.get('files', 'directory', fallback='/var/jobs/csv')
|
||||||
|
self.log_file = self.config.get('logging', 'log_file', fallback='/var/logs/sender.log')
|
||||||
|
self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/')
|
||||||
|
self.batch_size = 500
|
||||||
|
self.retry_attempts = 5 # Increased for robustness
|
||||||
|
self.retry_sleep = 2
|
||||||
|
self.check_interval = 30 # More frequent polling
|
||||||
|
|
||||||
|
# Redis for deduplication
|
||||||
|
redis_host = os.getenv("REDIS_HOST", "redis-scrape.thejobhub.xyz")
|
||||||
|
redis_port = int(os.getenv("REDIS_PORT", "6379"))
|
||||||
|
self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=1, decode_responses=True)
|
||||||
|
|
||||||
|
logging.basicConfig(filename=self.log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
self.connection = None
|
||||||
|
self.channel = None
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
signal.signal(signal.SIGTERM, self.graceful_shutdown)
|
||||||
|
signal.signal(signal.SIGINT, self.graceful_shutdown)
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
try:
|
||||||
|
credentials = pika.PlainCredentials(self.username, self.password)
|
||||||
|
parameters = pika.ConnectionParameters(
|
||||||
|
host=self.rabbitmq_host,
|
||||||
|
port=self.rabbitmq_port,
|
||||||
|
virtual_host=self.virtual_host,
|
||||||
|
credentials=credentials,
|
||||||
|
heartbeat=600,
|
||||||
|
blocked_connection_timeout=300
|
||||||
|
)
|
||||||
|
self.connection = pika.BlockingConnection(parameters)
|
||||||
|
self.channel = self.connection.channel()
|
||||||
|
self.channel.queue_declare(queue=self.queue_name, durable=True)
|
||||||
|
self.logger.info("Connected to RabbitMQ")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def reconnect(self):
|
||||||
|
if self.connection and self.connection.is_open:
|
||||||
|
self.connection.close()
|
||||||
|
time.sleep(self.retry_sleep)
|
||||||
|
return self.connect()
|
||||||
|
|
||||||
|
def send_message(self, message, message_id):
|
||||||
|
for attempt in range(self.retry_attempts):
|
||||||
|
try:
|
||||||
|
self.channel.basic_publish(
|
||||||
|
exchange='',
|
||||||
|
routing_key=self.queue_name,
|
||||||
|
body=message,
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=2,
|
||||||
|
message_id=message_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}")
|
||||||
|
if attempt < self.retry_attempts - 1:
|
||||||
|
time.sleep(self.retry_sleep * (2 ** attempt)) # Exponential backoff
|
||||||
|
if not self.reconnect():
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_job_seen(self, job_url):
|
||||||
|
"""Check if job was sent recently (7 days)"""
|
||||||
|
return self.redis_client.get(f"sent_job:{job_url}") is not None
|
||||||
|
|
||||||
|
def mark_job_sent(self, job_url):
|
||||||
|
"""Mark job as sent with 7-day TTL"""
|
||||||
|
self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1")
|
||||||
|
|
||||||
|
def process_csv(self, file_path):
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r') as csvfile:
|
||||||
|
reader = csv.DictReader(csvfile)
|
||||||
|
batch = []
|
||||||
|
sent_count = 0
|
||||||
|
skipped_count = 0
|
||||||
|
|
||||||
|
for row in reader:
|
||||||
|
# Validate required fields
|
||||||
|
if 'job_link' not in row or 'company_name' not in row:
|
||||||
|
self.logger.warning(f"Skipping invalid row in {file_path}: {row}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
job_link = row['job_link'].strip()
|
||||||
|
company_name = row['company_name'].strip()
|
||||||
|
|
||||||
|
if not job_link or not company_name:
|
||||||
|
self.logger.warning(f"Skipping empty row in {file_path}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Deduplication
|
||||||
|
if self.is_job_seen(job_link):
|
||||||
|
skipped_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
job_data = {
|
||||||
|
'job_link': job_link,
|
||||||
|
'company_name': company_name
|
||||||
|
}
|
||||||
|
|
||||||
|
message_id = str(uuid.uuid4())
|
||||||
|
message = json.dumps(job_data)
|
||||||
|
|
||||||
|
if self.send_message(message, message_id):
|
||||||
|
sent_count += 1
|
||||||
|
self.mark_job_sent(job_link)
|
||||||
|
else:
|
||||||
|
self.logger.error(f"Failed to send job: {job_link}")
|
||||||
|
|
||||||
|
if (sent_count + skipped_count) % 100 == 0:
|
||||||
|
self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path}")
|
||||||
|
|
||||||
|
self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped")
|
||||||
|
os.rename(file_path, file_path + '.processed')
|
||||||
|
self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed")
|
||||||
|
return sent_count
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error processing {file_path}: {str(e)}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def find_new_csvs(self):
|
||||||
|
files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')]
|
||||||
|
files.sort()
|
||||||
|
return [os.path.join(self.directory, f) for f in files]
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if not self.connect():
|
||||||
|
sys.exit(1)
|
||||||
|
while self.running:
|
||||||
|
new_files = self.find_new_csvs()
|
||||||
|
if new_files:
|
||||||
|
for file_path in new_files:
|
||||||
|
self.logger.info(f"Processing {file_path}")
|
||||||
|
sent = self.process_csv(file_path)
|
||||||
|
self.logger.info(f"Sent {sent} jobs from {file_path}")
|
||||||
|
else:
|
||||||
|
self.logger.info("No new CSV files found")
|
||||||
|
time.sleep(self.check_interval)
|
||||||
|
if self.connection and self.connection.is_open:
|
||||||
|
self.connection.close()
|
||||||
|
|
||||||
|
def graceful_shutdown(self, signum, frame):
|
||||||
|
self.logger.info("Received shutdown signal")
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sender = Sender()
|
||||||
|
sender.run()
|
||||||
Loading…
x
Reference in New Issue
Block a user