mas_storage/queue/
job.rs

1// Copyright 2024 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6//! Repository to interact with jobs in the job queue
7
8use async_trait::async_trait;
9use chrono::{DateTime, Duration, Utc};
10use opentelemetry::trace::TraceContextExt;
11use rand_core::RngCore;
12use serde::{Deserialize, Serialize};
13use tracing_opentelemetry::OpenTelemetrySpanExt;
14use ulid::Ulid;
15
16use super::Worker;
17use crate::{Clock, repository_impl};
18
19/// Represents a job in the job queue
20pub struct Job {
21    /// The ID of the job
22    pub id: Ulid,
23
24    /// The queue on which the job was placed
25    pub queue_name: String,
26
27    /// The payload of the job
28    pub payload: serde_json::Value,
29
30    /// Arbitrary metadata about the job
31    pub metadata: JobMetadata,
32
33    /// Which attempt it is
34    pub attempt: usize,
35}
36
37/// Metadata stored alongside the job
38#[derive(Serialize, Deserialize, Default, Clone, Debug)]
39pub struct JobMetadata {
40    #[serde(default)]
41    trace_id: String,
42
43    #[serde(default)]
44    span_id: String,
45
46    #[serde(default)]
47    trace_flags: u8,
48}
49
50impl JobMetadata {
51    fn new(span_context: &opentelemetry::trace::SpanContext) -> Self {
52        Self {
53            trace_id: span_context.trace_id().to_string(),
54            span_id: span_context.span_id().to_string(),
55            trace_flags: span_context.trace_flags().to_u8(),
56        }
57    }
58
59    /// Get the [`opentelemetry::trace::SpanContext`] from this [`JobMetadata`]
60    #[must_use]
61    pub fn span_context(&self) -> opentelemetry::trace::SpanContext {
62        use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
63        SpanContext::new(
64            TraceId::from_hex(&self.trace_id).unwrap_or(TraceId::INVALID),
65            SpanId::from_hex(&self.span_id).unwrap_or(SpanId::INVALID),
66            TraceFlags::new(self.trace_flags),
67            // Trace context is remote, as it comes from another service/from the database
68            true,
69            TraceState::NONE,
70        )
71    }
72}
73
74/// A trait that represents a job which can be inserted into a queue
75pub trait InsertableJob: Serialize + Send {
76    /// The name of the queue this job belongs to
77    const QUEUE_NAME: &'static str;
78}
79
80/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a
81/// worker.
82#[async_trait]
83pub trait QueueJobRepository: Send + Sync {
84    /// The error type returned by the repository.
85    type Error;
86
87    /// Schedule a job to be executed as soon as possible by a worker.
88    ///
89    /// # Parameters
90    ///
91    /// * `rng` - The random number generator used to generate a new job ID
92    /// * `clock` - The clock used to generate timestamps
93    /// * `queue_name` - The name of the queue to schedule the job on
94    /// * `payload` - The payload of the job
95    /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the underlying repository fails.
100    async fn schedule(
101        &mut self,
102        rng: &mut (dyn RngCore + Send),
103        clock: &dyn Clock,
104        queue_name: &str,
105        payload: serde_json::Value,
106        metadata: serde_json::Value,
107    ) -> Result<(), Self::Error>;
108
109    /// Schedule a job to be executed at a later date by a worker.
110    ///
111    /// # Parameters
112    ///
113    /// * `rng` - The random number generator used to generate a new job ID
114    /// * `clock` - The clock used to generate timestamps
115    /// * `queue_name` - The name of the queue to schedule the job on
116    /// * `payload` - The payload of the job
117    /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
118    /// * `scheduled_at` - The date and time to schedule the job for
119    /// * `schedule_name` - The name of the recurring schedule which scheduled
120    ///   this job
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the underlying repository fails.
125    #[allow(clippy::too_many_arguments)]
126    async fn schedule_later(
127        &mut self,
128        rng: &mut (dyn RngCore + Send),
129        clock: &dyn Clock,
130        queue_name: &str,
131        payload: serde_json::Value,
132        metadata: serde_json::Value,
133        scheduled_at: DateTime<Utc>,
134        schedule_name: Option<&str>,
135    ) -> Result<(), Self::Error>;
136
137    /// Reserve multiple jobs from multiple queues
138    ///
139    /// # Parameters
140    ///
141    /// * `clock` - The clock used to generate timestamps
142    /// * `worker` - The worker that is reserving the jobs
143    /// * `queues` - The queues to reserve jobs from
144    /// * `count` - The number of jobs to reserve
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the underlying repository fails.
149    async fn reserve(
150        &mut self,
151        clock: &dyn Clock,
152        worker: &Worker,
153        queues: &[&str],
154        count: usize,
155    ) -> Result<Vec<Job>, Self::Error>;
156
157    /// Mark a job as completed
158    ///
159    /// # Parameters
160    ///
161    /// * `clock` - The clock used to generate timestamps
162    /// * `id` - The ID of the job to mark as completed
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the underlying repository fails.
167    async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
168
169    /// Marks a job as failed.
170    ///
171    /// # Parameters
172    ///
173    /// * `clock` - The clock used to generate timestamps
174    /// * `id` - The ID of the job to mark as failed
175    /// * `reason` - The reason for the failure
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the underlying repository fails.
180    async fn mark_as_failed(
181        &mut self,
182        clock: &dyn Clock,
183        id: Ulid,
184        reason: &str,
185    ) -> Result<(), Self::Error>;
186
187    /// Retry a job.
188    ///
189    /// # Parameters
190    ///
191    /// * `rng` - The random number generator used to generate a new job ID
192    /// * `clock` - The clock used to generate timestamps
193    /// * `id` - The ID of the job to reschedule
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if the underlying repository fails.
198    async fn retry(
199        &mut self,
200        rng: &mut (dyn RngCore + Send),
201        clock: &dyn Clock,
202        id: Ulid,
203        delay: Duration,
204    ) -> Result<(), Self::Error>;
205
206    /// Mark all scheduled jobs past their scheduled date as available to be
207    /// executed.
208    ///
209    /// Returns the number of jobs that were marked as available.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the underlying repository fails.
214    async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
215}
216
217repository_impl!(QueueJobRepository:
218    async fn schedule(
219        &mut self,
220        rng: &mut (dyn RngCore + Send),
221        clock: &dyn Clock,
222        queue_name: &str,
223        payload: serde_json::Value,
224        metadata: serde_json::Value,
225    ) -> Result<(), Self::Error>;
226
227    async fn schedule_later(
228        &mut self,
229        rng: &mut (dyn RngCore + Send),
230        clock: &dyn Clock,
231        queue_name: &str,
232        payload: serde_json::Value,
233        metadata: serde_json::Value,
234        scheduled_at: DateTime<Utc>,
235        schedule_name: Option<&str>,
236    ) -> Result<(), Self::Error>;
237
238    async fn reserve(
239        &mut self,
240        clock: &dyn Clock,
241        worker: &Worker,
242        queues: &[&str],
243        count: usize,
244    ) -> Result<Vec<Job>, Self::Error>;
245
246    async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
247
248    async fn mark_as_failed(&mut self,
249        clock: &dyn Clock,
250        id: Ulid,
251        reason: &str,
252    ) -> Result<(), Self::Error>;
253
254    async fn retry(
255        &mut self,
256        rng: &mut (dyn RngCore + Send),
257        clock: &dyn Clock,
258        id: Ulid,
259        delay: Duration,
260    ) -> Result<(), Self::Error>;
261
262    async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
263);
264
265/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
266/// through the [`InsertableJob`] trait. This isn't in the
267/// [`QueueJobRepository`] trait to keep it object safe.
268#[async_trait]
269pub trait QueueJobRepositoryExt: QueueJobRepository {
270    /// Schedule a job to be executed as soon as possible by a worker.
271    ///
272    /// # Parameters
273    ///
274    /// * `rng` - The random number generator used to generate a new job ID
275    /// * `clock` - The clock used to generate timestamps
276    /// * `job` - The job to schedule
277    ///
278    /// # Errors
279    ///
280    /// Returns an error if the underlying repository fails.
281    async fn schedule_job<J: InsertableJob>(
282        &mut self,
283        rng: &mut (dyn RngCore + Send),
284        clock: &dyn Clock,
285        job: J,
286    ) -> Result<(), Self::Error>;
287
288    /// Schedule a job to be executed at a later date by a worker.
289    ///
290    /// # Parameters
291    ///
292    /// * `rng` - The random number generator used to generate a new job ID
293    /// * `clock` - The clock used to generate timestamps
294    /// * `job` - The job to schedule
295    /// * `scheduled_at` - The date and time to schedule the job for
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the underlying repository fails.
300    async fn schedule_job_later<J: InsertableJob>(
301        &mut self,
302        rng: &mut (dyn RngCore + Send),
303        clock: &dyn Clock,
304        job: J,
305        scheduled_at: DateTime<Utc>,
306    ) -> Result<(), Self::Error>;
307}
308
309#[async_trait]
310impl<T> QueueJobRepositoryExt for T
311where
312    T: QueueJobRepository,
313{
314    #[tracing::instrument(
315        name = "db.queue_job.schedule_job",
316        fields(
317            queue_job.queue_name = J::QUEUE_NAME,
318        ),
319        skip_all,
320    )]
321    async fn schedule_job<J: InsertableJob>(
322        &mut self,
323        rng: &mut (dyn RngCore + Send),
324        clock: &dyn Clock,
325        job: J,
326    ) -> Result<(), Self::Error> {
327        // Grab the span context from the current span
328        let span = tracing::Span::current();
329        let ctx = span.context();
330        let span = ctx.span();
331        let span_context = span.span_context();
332
333        let metadata = JobMetadata::new(span_context);
334        let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
335
336        let payload = serde_json::to_value(job).expect("Could not serialize job");
337        self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
338            .await
339    }
340
341    #[tracing::instrument(
342        name = "db.queue_job.schedule_job_later",
343        fields(
344            queue_job.queue_name = J::QUEUE_NAME,
345        ),
346        skip_all,
347    )]
348    async fn schedule_job_later<J: InsertableJob>(
349        &mut self,
350        rng: &mut (dyn RngCore + Send),
351        clock: &dyn Clock,
352        job: J,
353        scheduled_at: DateTime<Utc>,
354    ) -> Result<(), Self::Error> {
355        // Grab the span context from the current span
356        let span = tracing::Span::current();
357        let ctx = span.context();
358        let span = ctx.span();
359        let span_context = span.span_context();
360
361        let metadata = JobMetadata::new(span_context);
362        let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
363
364        let payload = serde_json::to_value(job).expect("Could not serialize job");
365        self.schedule_later(
366            rng,
367            clock,
368            J::QUEUE_NAME,
369            payload,
370            metadata,
371            scheduled_at,
372            None,
373        )
374        .await
375    }
376}