Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ services:
- PYTHONPATH=/app/src
- PYTHONDONTWRITEBYTECODE=1
- PYTHONUNBUFFERED=1
# Add your database and other environment variables here
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- JAO_BACKEND_SUPERUSER_USERNAME=${JAO_BACKEND_SUPERUSER_USERNAME:-admin_local}
- JAO_BACKEND_SUPERUSER_EMAIL=${JAO_BACKEND_SUPERUSER_EMAIL:-admin_local@example.com}
- JAO_BACKEND_SUPERUSER_PASSWORD=${JAO_BACKEND_SUPERUSER_PASSWORD:-admin123}
Expand All @@ -28,19 +29,25 @@ services:
- ./jao-backend/src:/app/src
depends_on:
- db
- skills-worker
- redis
networks:
- jao-network

jao-worker:
build:
context: .
dockerfile: ./jao-backend/Dockerfile.dev
command: poetry run celery -A jao_backend.common.celery worker --loglevel=INFO
command: poetry run celery -A jao_backend.common.celery worker --loglevel=INFO -Q celery
env_file:
- .env
environment:
- ENV=dev
- PYTHONPATH=/app/src
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}
Expand Down Expand Up @@ -122,6 +129,33 @@ services:
networks:
- jao-network

skills-worker:
platform: linux/arm64
build:
context: .
dockerfile: ./skills-worker/Dockerfile
command: celery -A skills_extraction.skills_celery_app worker --loglevel=INFO -Q skills_queue
env_file:
- .env
environment:
- ENV=dev
- PYTHONPATH=/app
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- POSTGRES_DB=jao_dev
- POSTGRES_USER=jao_user
- POSTGRES_PASSWORD=jao_password
- POSTGRES_HOST=db
- POSTGRES_PORT=5432
- OMP_NUM_THREADS=1
- MKL_NUM_THREADS=1
- TOKENIZERS_PARALLELISM=false
depends_on:
- db
- redis
networks:
- jao-network

volumes:
postgres_data:
redis_data:
Expand Down
2 changes: 1 addition & 1 deletion iac/modules/bastion/main.tf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
modules/bastion/main.tf
#modules/bastion/main.tf
data "aws_ami" "amazon_linux" {
most_recent = true
owners = ["amazon"]
Expand Down
36 changes: 36 additions & 0 deletions iac/modules/ecs/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,24 @@ locals {
}
}
])
# Skills Worker container definitions (Python 3.10)
skills_worker_container_definitions = jsonencode([
{
name = "skills-worker"
image = var.skills_worker_image_url
essential = true
command = ["python", "skills_extraction.py"]
environment = local.base_environment
logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group" = local.cloudwatch_log_group_name
"awslogs-region" = data.aws_region.current.name
"awslogs-stream-prefix" = "skills-worker"
}
}
}
])
}

# Data sources
Expand Down Expand Up @@ -602,6 +620,24 @@ resource "aws_ecs_task_definition" "beat" {
)
}

resource "aws_ecs_task_definition" "skills_worker" {
family = "${local.name}-skills-worker-task"
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = var.skills_worker_cpu
memory = var.skills_worker_memory
execution_role_arn = local.task_execution_role_arn
task_role_arn = local.task_role_arn
container_definitions = local.skills_worker_container_definitions

tags = merge(
local.common_tags,
{
Name = "${local.name}-skills-worker-task-definition"
}
)
}

# ECS service
resource "aws_ecs_service" "api" {
name = "${local.name}-api-service"
Expand Down
19 changes: 19 additions & 0 deletions iac/modules/ecs/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,22 @@ variable "additional_security_group_ids" {
type = list(string)
default = []
}


# Skills Worker Configuration (Python 3.10)
variable "skills_worker_image_url" {
description = "ECR URL for the Python 3.10 skills worker image"
type = string
}

variable "skills_worker_cpu" {
description = "CPU units for skills worker (e.g., 512)"
type = number
default = 512
}

variable "skills_worker_memory" {
description = "Memory for skills worker (e.g., 1024)"
type = number
default = 1024
}
4 changes: 4 additions & 0 deletions jao-backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ channels = "^4.3.1"
channels-redis = "^4.3.0"
daphne = "^4.2.1"
autobahn = "^23.6.2"
sentence-transformers = "^5.2.0"
umap-learn = "^0.5.11"
hdbscan = "^0.8.41"
scikit-learn = "^1.8.0"

[tool.poetry.group.dev.dependencies]
pre-commit = "^3.7.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.0.14 on 2025-10-30 18:02

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('applicant_text', '0001_initial'),
]

operations = [
migrations.AddField(
model_name='vacancytextaggregate',
name='skills_extracted',
field=models.BooleanField(db_index=True, default=False, help_text='True if this text has been processed by the skills-worker.'),
),
]
6 changes: 6 additions & 0 deletions jao-backend/src/jao_backend/applicant_text/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,11 @@ class VacancyTextAggregate(models.Model):
all_employment_history = models.TextField(blank=True)
all_previous_skills = models.TextField(blank=True)

skills_extracted = models.BooleanField(
default=False,
db_index=True,
help_text="True if this text has been processed by the skills-worker."
)

def __str__(self):
return f"Aggregated text for {self.vacancy.pk}"
2 changes: 1 addition & 1 deletion jao-backend/src/jao_backend/common/litellm/model_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def get_model_list(cls) -> List[str]:
def is_available(cls) -> bool:
try:
client = boto3.client("bedrock", region_name=BEDROCK_REGION)
client.list_foundation_models(maxResults=1)
#client.list_foundation_models(maxResults=1)
return True
except (ClientError, NoCredentialsError) as e:
logger.error(f"Bedrock not available: {e}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import numpy as np
import litellm
from django.core.management.base import BaseCommand
from django.db import transaction
from django.conf import settings
from sentence_transformers import SentenceTransformer
from umap import UMAP
import hdbscan
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm

from jao_backend.skills.models import Skill, SkillCluster
from jao_backend.common.litellm.model_list import ModelLister

class Command(BaseCommand):
help = "Modular pipeline to cluster skills and name the resulting groups."

def add_arguments(self, parser):
parser.add_argument('--cluster', action='store_true', help='Run UMAP/HDBSCAN clustering math.')
parser.add_argument('--name', action='store_true', help='Run LLM naming loop for unnamed clusters.')
parser.add_argument('--clear', action='store_true', help='Reset all clusters and delete SkillCluster records.')

def handle(self, *args, **options):
if options['clear']:
self.stdout.write(self.style.WARNING("Clearing existing cluster data..."))
with transaction.atomic():
Skill.objects.all().update(cluster_id=-1)
SkillCluster.objects.all().delete()
self.stdout.write(self.style.SUCCESS("Database reset complete."))

if options['cluster']:
self.run_clustering()

if options['name']:
self.run_naming()

if not any([options['cluster'], options['name'], options['clear']]):
self.stdout.write("No flags provided. Use --cluster, --name, or --clear.")

def run_clustering(self):
self.stdout.write("--- Starting Clustering Phase ---")
skills = list(Skill.objects.all())
names = [s.name for s in skills]

if len(names) < 10:
self.stdout.write(self.style.ERROR("Not enough skills to cluster."))
return

model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(names, show_progress_bar=True)

umap_embeddings = UMAP(n_neighbors=15, n_components=2, min_dist=0.0, random_state=42).fit_transform(embeddings)
labels = hdbscan.HDBSCAN(min_cluster_size=5).fit_predict(umap_embeddings)

valid_mask = labels != -1
if np.any(valid_mask) and np.any(~valid_mask):
knn = NearestNeighbors(n_neighbors=1).fit(umap_embeddings[valid_mask])
noise_idx = np.where(labels == -1)[0]
_, nearest = knn.kneighbors(umap_embeddings[noise_idx])
labels[noise_idx] = labels[valid_mask][nearest.flatten()]

clusters_found = np.unique(labels)
with transaction.atomic():
for c_id in clusters_found:
SkillCluster.objects.get_or_create(cluster_id=int(c_id), defaults={'name': f"Cluster {c_id}"})
for c_id in clusters_found:
ids = [skills[i].id for i in np.where(labels == c_id)[0]]
Skill.objects.filter(id__in=ids).update(cluster_id=int(c_id))

self.stdout.write(self.style.SUCCESS(f"Clustered {len(names)} skills into {len(clusters_found)} groups."))

def run_naming(self):
self.stdout.write("--- Starting Naming Phase ---")

if not ModelLister.is_available():
self.stdout.write(self.style.ERROR(f"AI Provider ({settings.LITELLM_CUSTOM_PROVIDER}) is not reachable!"))
return

unnamed = SkillCluster.objects.filter(name__startswith="Cluster ")
if not unnamed.exists():
self.stdout.write("No clusters need naming.")
return

raw_model = settings.LITELLM_COMPLETION_MODEL
full_model_name = ModelLister.get_litellm_model_name(raw_model)

for cluster in tqdm(unnamed, desc="AI Naming"):
skill_sample = list(Skill.objects.filter(cluster_id=cluster.cluster_id).values_list('name', flat=True)[:30])

try:
response = litellm.completion(
model=full_model_name,
messages=[
{"role": "system", "content": "Return ONLY a 2-3 word category name for these skills."},
{"role": "user", "content": f"Skills: {', '.join(skill_sample)}"}
],
max_tokens=15, temperature=0.1
)
name = response.choices[0].message.content.strip().replace('"', '')
if name:
cluster.name = name
cluster.save()
except Exception as e:
self.stderr.write(f"Error at Cluster {cluster.cluster_id}: {e}")

self.stdout.write(self.style.SUCCESS("Naming complete!"))
Loading