# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db workers."""
import copy
import hashlib
from typing import Any, Optional, TYPE_CHECKING
from django.db import IntegrityError, models, transaction
from django.db.models import CheckConstraint, Count, F, JSONField, Q, QuerySet
from django.utils import timezone
from django.utils.text import slugify
from debusine.db.models import WorkRequest
from debusine.db.models.auth import Token
from debusine.tasks.models import WorkerType
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
else:
TypedModelMeta = object
class WorkerManager(models.Manager["Worker"]):
"""Manager for Worker model."""
def connected(self) -> QuerySet["Worker"]:
"""Return connected workers."""
return Worker.objects.filter(connected_at__isnull=False).order_by(
'connected_at'
)
def waiting_for_work_request(self) -> QuerySet["Worker"]:
"""
Return workers that can be assigned a new work request.
The workers with fewer associated pending or running work requests
than their concurrency level could take more work right now and are
thus waiting for a work request.
Worker's token must be enabled.
"""
# Import here to prevent circular imports
from debusine.db.models.work_requests import WorkRequest
running_work_request_count = Count(
'assigned_work_requests',
filter=Q(
assigned_work_requests__status__in=[
WorkRequest.Statuses.RUNNING,
WorkRequest.Statuses.PENDING,
]
),
)
workers = (
Worker.objects.filter(connected_at__isnull=False)
.order_by('connected_at')
.annotate(count_running=running_work_request_count)
.filter(count_running__lt=F("concurrency"))
.filter(Q(worker_type=WorkerType.CELERY) | Q(token__enabled=True))
)
return workers
@staticmethod
def _generate_unique_name(name: str, counter: int) -> str:
"""Return name slugified adding "-counter" if counter != 1."""
new_name = slugify(name.replace('.', '-'))
if counter != 1:
new_name += f'-{counter}'
return new_name
@classmethod
def create_with_fqdn(
cls,
fqdn: str,
token: Token,
worker_type: WorkerType = WorkerType.EXTERNAL,
) -> "Worker":
"""Return a new Worker with its name based on fqdn, with token."""
counter = 1
while True:
name = cls._generate_unique_name(fqdn, counter)
try:
with transaction.atomic():
return Worker.objects.create(
name=name,
token=token,
worker_type=worker_type,
registered_at=timezone.now(),
)
except IntegrityError:
counter += 1
@classmethod
def get_or_create_celery(cls) -> "Worker":
"""Return a new Worker representing the Celery task queue."""
try:
return Worker.objects.get(
name="celery", worker_type=WorkerType.CELERY
)
except Worker.DoesNotExist:
return Worker.objects.create(
name="celery",
worker_type=WorkerType.CELERY,
registered_at=timezone.now(),
)
def get_worker_by_token_key_or_none(
self, token_key: str
) -> Optional["Worker"]:
"""Return a Worker identified by its associated secret token."""
try:
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
return Worker.objects.get(token__hash=token_hash)
except Worker.DoesNotExist:
return None
def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]:
"""Return the worker with worker_name or None."""
try:
return self.get(name=worker_name)
except Worker.DoesNotExist:
return None
[docs]
class Worker(models.Model):
"""Database model of a worker."""
name = models.SlugField(
unique=True,
help_text='Human readable name of the worker based on the FQDN',
)
registered_at = models.DateTimeField()
connected_at = models.DateTimeField(blank=True, null=True)
# This is the token used by the Worker to authenticate
# Users have their own tokens - this is specific to a single worker.
token = models.OneToOneField(
Token, null=True, on_delete=models.PROTECT, related_name="worker"
)
static_metadata = JSONField(default=dict, blank=True)
dynamic_metadata = JSONField(default=dict, blank=True)
dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True)
worker_type = models.CharField(
max_length=8,
choices=WorkerType.choices,
default=WorkerType.EXTERNAL,
editable=False,
)
# Only Celery workers currently support concurrency levels greater than
# 1.
concurrency = models.PositiveIntegerField(
default=1,
help_text="Number of tasks this worker can run simultaneously",
)
class Meta(TypedModelMeta):
constraints = [
# Non-Celery workers must have a token.
CheckConstraint(
name="%(app_label)s_%(class)s_celery_or_token",
check=Q(worker_type=WorkerType.CELERY) | Q(token__isnull=False),
)
]
[docs]
def mark_disconnected(self) -> None:
"""Update and save relevant Worker fields after disconnecting."""
self.connected_at = None
self.save()
[docs]
def running_work_requests(self) -> QuerySet["WorkRequest"]:
"""Return queryset of work requests running on this worker."""
return self.assigned_work_requests.filter(
status=WorkRequest.Statuses.RUNNING
).order_by("id")
[docs]
def mark_connected(self) -> None:
"""Update and save relevant Worker fields after connecting."""
self.connected_at = timezone.now()
self.save()
[docs]
def connected(self) -> bool:
"""Return True if the Worker is connected."""
return self.connected_at is not None
[docs]
def is_busy(self) -> bool:
"""
Return True if the Worker is busy with work requests.
A Worker is busy if it has as many running or pending work requests
as its concurrency level.
"""
# Import here to prevent circular imports
from debusine.db.models.work_requests import WorkRequest
return (
WorkRequest.objects.running(worker=self)
| WorkRequest.objects.pending(worker=self)
).count() >= self.concurrency
def __str__(self) -> str:
"""Return the id and name of the Worker."""
return f"Id: {self.id} Name: {self.name}"
objects = WorkerManager()