Skip to content

Support celery 5.5.0rc4 in opentelemetry.instrumentation.celery #3193

Open
@NixBiks

Description

@NixBiks

The current instrumentation of celery doesn't work with celery v5.5.0 release candidate.

Minimal reproducible example

celery_app.py

import logging

from celery import Celery, signals
from opentelemetry import trace
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from rich.logging import RichHandler

logger = logging.getLogger(__name__)


# setup logging
@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
    logging.basicConfig(
        level=logging.INFO,
        format="[trace_id=%(otelTraceID)s span_id=%(otelSpanID)s]%(message)s",
        datefmt="[%X]",
        handlers=[RichHandler(rich_tracebacks=True)],
    )
    LoggingInstrumentor().instrument()


# Set up tracing
@signals.worker_process_init.connect
def setup_celery_tracing(**kwargs):
    trace.set_tracer_provider(TracerProvider())

    CeleryInstrumentor().instrument()


tracer = trace.get_tracer("worker")

celery_app = Celery(
    "worker",
    broker="amqp://guest:guest@localhost:5672//",
    backend="rpc://",
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
)


@celery_app.task(name="ping")
def ping():
    logger.info("Received ping task")
    with tracer.start_as_current_span("ping_task") as span:
        span.set_attribute("ping", "pong")
        logger.info("Sending pong response")
    return "pong"

Start the celery worker with celery -A celery_app worker

Now run the following python script

import logging

from celery import Celery
from opentelemetry import trace
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from rich.logging import RichHandler

logger = logging.getLogger(__name__)


# setup logging
logging.basicConfig(
    level=logging.INFO,
    format="[trace_id=%(otelTraceID)s span_id=%(otelSpanID)s]%(message)s",
    datefmt="[%X]",
    handlers=[RichHandler(rich_tracebacks=True)],
)
LoggingInstrumentor().instrument()


# Set up tracing
trace.set_tracer_provider(TracerProvider())

CeleryInstrumentor().instrument()

celery_app = Celery(
    "worker",
    broker="amqp://guest:guest@localhost:5672//",
    backend="rpc://",
)

with trace.get_tracer("sender").start_as_current_span("send_celery_task") as span:
    span.set_attribute("celery.task_name", "ping")
    logger.info("Sending task to Celery")
    celery_app.send_task("ping")

Note that the trace id are not the same in the script and the celery task when using opentelemetry-instrumentation-celery==0.50b0 and celery==5.5.0rc4. However they are the same if celery==5.4.0. Is it possible to make the instrumentation work with the release candidate?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions