task_manager est la couche d'abstraction entre Mayan et Celery. Elle permet à chaque app de déclarer ses tâches de façon structurée, les regroupe dans des queues, assigne ces queues à des workers, et configure tout ça automatiquement dans Celery au démarrage.
Elle expose aussi une interface UI dans le menu Outils pour visualiser workers, queues et tâches actives.
Worker — processus CeleryUn worker est un processus Celery distinct avec ses propres ressources. Cinq workers sont définis par défaut dans workers.py :
| Worker | Description |
|---|---|
worker_a |
Tâches à faible latence, haut volume |
worker_b |
Tâches à latence moyenne |
worker_c |
Tâches à latence moyenne |
worker_d |
Tâches longues, faible latence |
worker_e |
Tâches longues, faible latence |
Paramètres configurables par variables d'environnement :
| Variable | Rôle |
|---|---|
MAYAN_WORKER_X_CONCURRENCY |
Nombre de processus parallèles (0 = nbre de CPU) |
MAYAN_WORKER_X_MAX_MEMORY_PER_CHILD |
Mémoire max avant remplacement du processus |
MAYAN_WORKER_X_MAX_TASKS_PER_CHILD |
Tâches max avant remplacement du processus |
MAYAN_WORKER_X_NICE_LEVEL |
Priorité OS (0 = normal, >0 = basse priorité) |
CeleryQueue — file d'attenteUne queue est une file de messages Kombu/RabbitMQ/Redis assignée à un worker. Les queues peuvent être persistantes (défaut) ou transientes (transient=True — les messages sont perdus si le broker redémarre, utilisé pour les tâches non critiques).
Chaque app déclare ses queues dans un fichier queues.py chargé automatiquement via AppsModuleLoaderMixin.
TaskType — type de tâcheUn TaskType représente une tâche Celery spécifique. Il est toujours déclaré via sa queue parente :
# Dans queues.py d'une app
queue_ocr = CeleryQueue(
label='OCR', name='ocr', worker=worker_b
)
task_ocr_document_file_page = queue_ocr.add_task_type(
dotted_path='mayan.apps.ocr.tasks.task_ocr_document_file_page',
label='OCR document file page',
)
Un TaskType peut aussi avoir un schedule pour une exécution périodique (Celery Beat) :
from celery.schedules import crontab
task_check_expired = queue_a.add_task_type(
dotted_path='mayan.apps.documents.tasks.task_check_expired_links',
label='Check expired links',
schedule=crontab(minute='*/5'),
)
Au démarrage (CeleryQueue.load_modules() → post_load_modules() → update_celery()), chaque queue enregistre automatiquement dans Celery :
Queue Kombu avec son exchange et routing keytask_routes) pour chaque TaskTypebeat_schedule pour les tâches périodiquesSi une tâche Celery existe mais n'est pas déclarée dans un TaskType, Mayan lève une ImproperlyConfigured au démarrage — ce qui garantit qu'aucune tâche ne passe à travers les mailles.
Dans apps.py, ready() effectue deux tests de connectivité bloquants avant de lancer l'application :
check_broker_connectivity) — connexion à RabbitMQ/Redis. Si ça échoue → exit(1)check_results_backend_connectivity) — écrit et supprime une clé de test. Si ça échoue → exit(1)Mayan refuse de démarrer si l'infrastructure Celery n'est pas disponible.
# Supprimer toutes les tâches périodiques de la base (django_celery_beat)
./manage.py task_manager_purge_periodic_tasks --settings=mayan.settings.development
En dev, CELERY_TASK_ALWAYS_EAGER=True — les tâches s'exécutent synchroniquement inline, sans workers. En production :
# Lancer un worker spécifique
celery -A mayan.celery worker -Q queue_ocr,queue_documents_fast -n worker_b@%h --loglevel=info
# Via Docker Compose (profils dédiés par worker)
COMPOSE_PROFILES=all_in_one,worker_a,worker_b docker-compose up
| Fichier | Rôle |
|---|---|
classes.py |
Worker, CeleryQueue, TaskType, Task |
workers.py |
Déclaration des 5 workers par défaut (worker_a → worker_e) |
apps.py |
Vérification broker/backend, chargement des queues, colonnes UI |
utils.py |
purge_periodic_tasks() |
queues.py |
(dans chaque app) Déclaration des queues et types de tâches |