Viewing file: arq.py (6.58 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
from __future__ import absolute_import
import sys
from sentry_sdk._compat import reraise from sentry_sdk._types import TYPE_CHECKING from sentry_sdk import Hub from sentry_sdk.consts import OP from sentry_sdk.hub import _should_send_default_pii from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK from sentry_sdk.utils import ( capture_internal_exceptions, event_from_exception, SENSITIVE_DATA_SUBSTITUTE, parse_version, )
try: import arq.worker from arq.version import VERSION as ARQ_VERSION from arq.connections import ArqRedis from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker except ImportError: raise DidNotEnable("Arq is not installed")
if TYPE_CHECKING: from typing import Any, Dict, Optional, Union
from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint
from arq.cron import CronJob from arq.jobs import Job from arq.typing import WorkerCoroutine from arq.worker import Function
ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob)
class ArqIntegration(Integration): identifier = "arq"
@staticmethod def setup_once(): # type: () -> None
try: if isinstance(ARQ_VERSION, str): version = parse_version(ARQ_VERSION) else: version = ARQ_VERSION.version[:2]
except (TypeError, ValueError): version = None
if version is None: raise DidNotEnable("Unparsable arq version: {}".format(ARQ_VERSION))
if version < (0, 23): raise DidNotEnable("arq 0.23 or newer required.")
patch_enqueue_job() patch_run_job() patch_create_worker()
ignore_logger("arq.worker")
def patch_enqueue_job(): # type: () -> None old_enqueue_job = ArqRedis.enqueue_job
async def _sentry_enqueue_job(self, function, *args, **kwargs): # type: (ArqRedis, str, *Any, **Any) -> Optional[Job] hub = Hub.current
if hub.get_integration(ArqIntegration) is None: return await old_enqueue_job(self, function, *args, **kwargs)
with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function): return await old_enqueue_job(self, function, *args, **kwargs)
ArqRedis.enqueue_job = _sentry_enqueue_job
def patch_run_job(): # type: () -> None old_run_job = Worker.run_job
async def _sentry_run_job(self, job_id, score): # type: (Worker, str, int) -> None hub = Hub(Hub.current)
if hub.get_integration(ArqIntegration) is None: return await old_run_job(self, job_id, score)
with hub.push_scope() as scope: scope._name = "arq" scope.clear_breadcrumbs()
transaction = Transaction( name="unknown arq task", status="ok", op=OP.QUEUE_TASK_ARQ, source=TRANSACTION_SOURCE_TASK, )
with hub.start_transaction(transaction): return await old_run_job(self, job_id, score)
Worker.run_job = _sentry_run_job
def _capture_exception(exc_info): # type: (ExcInfo) -> None hub = Hub.current
if hub.scope.transaction is not None: if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: hub.scope.transaction.set_status("aborted") return
hub.scope.transaction.set_status("internal_error")
event, hint = event_from_exception( exc_info, client_options=hub.client.options if hub.client else None, mechanism={"type": ArqIntegration.identifier, "handled": False}, ) hub.capture_event(event, hint=hint)
def _make_event_processor(ctx, *args, **kwargs): # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor def event_processor(event, hint): # type: (Event, Hint) -> Optional[Event]
hub = Hub.current
with capture_internal_exceptions(): if hub.scope.transaction is not None: hub.scope.transaction.name = ctx["job_name"] event["transaction"] = ctx["job_name"]
tags = event.setdefault("tags", {}) tags["arq_task_id"] = ctx["job_id"] tags["arq_task_retry"] = ctx["job_try"] > 1 extra = event.setdefault("extra", {}) extra["arq-job"] = { "task": ctx["job_name"], "args": args if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE, "kwargs": kwargs if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE, "retry": ctx["job_try"], }
return event
return event_processor
def _wrap_coroutine(name, coroutine): # type: (str, WorkerCoroutine) -> WorkerCoroutine async def _sentry_coroutine(ctx, *args, **kwargs): # type: (Dict[Any, Any], *Any, **Any) -> Any hub = Hub.current if hub.get_integration(ArqIntegration) is None: return await coroutine(*args, **kwargs)
hub.scope.add_event_processor( _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) )
try: result = await coroutine(ctx, *args, **kwargs) except Exception: exc_info = sys.exc_info() _capture_exception(exc_info) reraise(*exc_info)
return result
return _sentry_coroutine
def patch_create_worker(): # type: () -> None old_create_worker = arq.worker.create_worker
def _sentry_create_worker(*args, **kwargs): # type: (*Any, **Any) -> Worker hub = Hub.current
if hub.get_integration(ArqIntegration) is None: return old_create_worker(*args, **kwargs)
settings_cls = args[0]
functions = settings_cls.functions cron_jobs = settings_cls.cron_jobs
settings_cls.functions = [_get_arq_function(func) for func in functions] settings_cls.cron_jobs = [_get_arq_cron_job(cron_job) for cron_job in cron_jobs]
return old_create_worker(*args, **kwargs)
arq.worker.create_worker = _sentry_create_worker
def _get_arq_function(func): # type: (Union[str, Function, WorkerCoroutine]) -> Function arq_func = arq.worker.func(func) arq_func.coroutine = _wrap_coroutine(arq_func.name, arq_func.coroutine)
return arq_func
def _get_arq_cron_job(cron_job): # type: (CronJob) -> CronJob cron_job.coroutine = _wrap_coroutine(cron_job.name, cron_job.coroutine)
return cron_job
|