mas_storage/queue/job.rs
1// Copyright 2024 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6//! Repository to interact with jobs in the job queue
7
8use async_trait::async_trait;
9use chrono::{DateTime, Duration, Utc};
10use opentelemetry::trace::TraceContextExt;
11use rand_core::RngCore;
12use serde::{Deserialize, Serialize};
13use tracing_opentelemetry::OpenTelemetrySpanExt;
14use ulid::Ulid;
15
16use super::Worker;
17use crate::{Clock, repository_impl};
18
19/// Represents a job in the job queue
20pub struct Job {
21 /// The ID of the job
22 pub id: Ulid,
23
24 /// The queue on which the job was placed
25 pub queue_name: String,
26
27 /// The payload of the job
28 pub payload: serde_json::Value,
29
30 /// Arbitrary metadata about the job
31 pub metadata: JobMetadata,
32
33 /// Which attempt it is
34 pub attempt: usize,
35}
36
37/// Metadata stored alongside the job
38#[derive(Serialize, Deserialize, Default, Clone, Debug)]
39pub struct JobMetadata {
40 #[serde(default)]
41 trace_id: String,
42
43 #[serde(default)]
44 span_id: String,
45
46 #[serde(default)]
47 trace_flags: u8,
48}
49
50impl JobMetadata {
51 fn new(span_context: &opentelemetry::trace::SpanContext) -> Self {
52 Self {
53 trace_id: span_context.trace_id().to_string(),
54 span_id: span_context.span_id().to_string(),
55 trace_flags: span_context.trace_flags().to_u8(),
56 }
57 }
58
59 /// Get the [`opentelemetry::trace::SpanContext`] from this [`JobMetadata`]
60 #[must_use]
61 pub fn span_context(&self) -> opentelemetry::trace::SpanContext {
62 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
63 SpanContext::new(
64 TraceId::from_hex(&self.trace_id).unwrap_or(TraceId::INVALID),
65 SpanId::from_hex(&self.span_id).unwrap_or(SpanId::INVALID),
66 TraceFlags::new(self.trace_flags),
67 // Trace context is remote, as it comes from another service/from the database
68 true,
69 TraceState::NONE,
70 )
71 }
72}
73
74/// A trait that represents a job which can be inserted into a queue
75pub trait InsertableJob: Serialize + Send {
76 /// The name of the queue this job belongs to
77 const QUEUE_NAME: &'static str;
78}
79
80/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a
81/// worker.
82#[async_trait]
83pub trait QueueJobRepository: Send + Sync {
84 /// The error type returned by the repository.
85 type Error;
86
87 /// Schedule a job to be executed as soon as possible by a worker.
88 ///
89 /// # Parameters
90 ///
91 /// * `rng` - The random number generator used to generate a new job ID
92 /// * `clock` - The clock used to generate timestamps
93 /// * `queue_name` - The name of the queue to schedule the job on
94 /// * `payload` - The payload of the job
95 /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
96 ///
97 /// # Errors
98 ///
99 /// Returns an error if the underlying repository fails.
100 async fn schedule(
101 &mut self,
102 rng: &mut (dyn RngCore + Send),
103 clock: &dyn Clock,
104 queue_name: &str,
105 payload: serde_json::Value,
106 metadata: serde_json::Value,
107 ) -> Result<(), Self::Error>;
108
109 /// Schedule a job to be executed at a later date by a worker.
110 ///
111 /// # Parameters
112 ///
113 /// * `rng` - The random number generator used to generate a new job ID
114 /// * `clock` - The clock used to generate timestamps
115 /// * `queue_name` - The name of the queue to schedule the job on
116 /// * `payload` - The payload of the job
117 /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
118 /// * `scheduled_at` - The date and time to schedule the job for
119 /// * `schedule_name` - The name of the recurring schedule which scheduled
120 /// this job
121 ///
122 /// # Errors
123 ///
124 /// Returns an error if the underlying repository fails.
125 #[allow(clippy::too_many_arguments)]
126 async fn schedule_later(
127 &mut self,
128 rng: &mut (dyn RngCore + Send),
129 clock: &dyn Clock,
130 queue_name: &str,
131 payload: serde_json::Value,
132 metadata: serde_json::Value,
133 scheduled_at: DateTime<Utc>,
134 schedule_name: Option<&str>,
135 ) -> Result<(), Self::Error>;
136
137 /// Reserve multiple jobs from multiple queues
138 ///
139 /// # Parameters
140 ///
141 /// * `clock` - The clock used to generate timestamps
142 /// * `worker` - The worker that is reserving the jobs
143 /// * `queues` - The queues to reserve jobs from
144 /// * `count` - The number of jobs to reserve
145 ///
146 /// # Errors
147 ///
148 /// Returns an error if the underlying repository fails.
149 async fn reserve(
150 &mut self,
151 clock: &dyn Clock,
152 worker: &Worker,
153 queues: &[&str],
154 count: usize,
155 ) -> Result<Vec<Job>, Self::Error>;
156
157 /// Mark a job as completed
158 ///
159 /// # Parameters
160 ///
161 /// * `clock` - The clock used to generate timestamps
162 /// * `id` - The ID of the job to mark as completed
163 ///
164 /// # Errors
165 ///
166 /// Returns an error if the underlying repository fails.
167 async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
168
169 /// Marks a job as failed.
170 ///
171 /// # Parameters
172 ///
173 /// * `clock` - The clock used to generate timestamps
174 /// * `id` - The ID of the job to mark as failed
175 /// * `reason` - The reason for the failure
176 ///
177 /// # Errors
178 ///
179 /// Returns an error if the underlying repository fails.
180 async fn mark_as_failed(
181 &mut self,
182 clock: &dyn Clock,
183 id: Ulid,
184 reason: &str,
185 ) -> Result<(), Self::Error>;
186
187 /// Retry a job.
188 ///
189 /// # Parameters
190 ///
191 /// * `rng` - The random number generator used to generate a new job ID
192 /// * `clock` - The clock used to generate timestamps
193 /// * `id` - The ID of the job to reschedule
194 ///
195 /// # Errors
196 ///
197 /// Returns an error if the underlying repository fails.
198 async fn retry(
199 &mut self,
200 rng: &mut (dyn RngCore + Send),
201 clock: &dyn Clock,
202 id: Ulid,
203 delay: Duration,
204 ) -> Result<(), Self::Error>;
205
206 /// Mark all scheduled jobs past their scheduled date as available to be
207 /// executed.
208 ///
209 /// Returns the number of jobs that were marked as available.
210 ///
211 /// # Errors
212 ///
213 /// Returns an error if the underlying repository fails.
214 async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
215}
216
217repository_impl!(QueueJobRepository:
218 async fn schedule(
219 &mut self,
220 rng: &mut (dyn RngCore + Send),
221 clock: &dyn Clock,
222 queue_name: &str,
223 payload: serde_json::Value,
224 metadata: serde_json::Value,
225 ) -> Result<(), Self::Error>;
226
227 async fn schedule_later(
228 &mut self,
229 rng: &mut (dyn RngCore + Send),
230 clock: &dyn Clock,
231 queue_name: &str,
232 payload: serde_json::Value,
233 metadata: serde_json::Value,
234 scheduled_at: DateTime<Utc>,
235 schedule_name: Option<&str>,
236 ) -> Result<(), Self::Error>;
237
238 async fn reserve(
239 &mut self,
240 clock: &dyn Clock,
241 worker: &Worker,
242 queues: &[&str],
243 count: usize,
244 ) -> Result<Vec<Job>, Self::Error>;
245
246 async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
247
248 async fn mark_as_failed(&mut self,
249 clock: &dyn Clock,
250 id: Ulid,
251 reason: &str,
252 ) -> Result<(), Self::Error>;
253
254 async fn retry(
255 &mut self,
256 rng: &mut (dyn RngCore + Send),
257 clock: &dyn Clock,
258 id: Ulid,
259 delay: Duration,
260 ) -> Result<(), Self::Error>;
261
262 async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
263);
264
265/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
266/// through the [`InsertableJob`] trait. This isn't in the
267/// [`QueueJobRepository`] trait to keep it object safe.
268#[async_trait]
269pub trait QueueJobRepositoryExt: QueueJobRepository {
270 /// Schedule a job to be executed as soon as possible by a worker.
271 ///
272 /// # Parameters
273 ///
274 /// * `rng` - The random number generator used to generate a new job ID
275 /// * `clock` - The clock used to generate timestamps
276 /// * `job` - The job to schedule
277 ///
278 /// # Errors
279 ///
280 /// Returns an error if the underlying repository fails.
281 async fn schedule_job<J: InsertableJob>(
282 &mut self,
283 rng: &mut (dyn RngCore + Send),
284 clock: &dyn Clock,
285 job: J,
286 ) -> Result<(), Self::Error>;
287
288 /// Schedule a job to be executed at a later date by a worker.
289 ///
290 /// # Parameters
291 ///
292 /// * `rng` - The random number generator used to generate a new job ID
293 /// * `clock` - The clock used to generate timestamps
294 /// * `job` - The job to schedule
295 /// * `scheduled_at` - The date and time to schedule the job for
296 ///
297 /// # Errors
298 ///
299 /// Returns an error if the underlying repository fails.
300 async fn schedule_job_later<J: InsertableJob>(
301 &mut self,
302 rng: &mut (dyn RngCore + Send),
303 clock: &dyn Clock,
304 job: J,
305 scheduled_at: DateTime<Utc>,
306 ) -> Result<(), Self::Error>;
307}
308
309#[async_trait]
310impl<T> QueueJobRepositoryExt for T
311where
312 T: QueueJobRepository,
313{
314 #[tracing::instrument(
315 name = "db.queue_job.schedule_job",
316 fields(
317 queue_job.queue_name = J::QUEUE_NAME,
318 ),
319 skip_all,
320 )]
321 async fn schedule_job<J: InsertableJob>(
322 &mut self,
323 rng: &mut (dyn RngCore + Send),
324 clock: &dyn Clock,
325 job: J,
326 ) -> Result<(), Self::Error> {
327 // Grab the span context from the current span
328 let span = tracing::Span::current();
329 let ctx = span.context();
330 let span = ctx.span();
331 let span_context = span.span_context();
332
333 let metadata = JobMetadata::new(span_context);
334 let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
335
336 let payload = serde_json::to_value(job).expect("Could not serialize job");
337 self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
338 .await
339 }
340
341 #[tracing::instrument(
342 name = "db.queue_job.schedule_job_later",
343 fields(
344 queue_job.queue_name = J::QUEUE_NAME,
345 ),
346 skip_all,
347 )]
348 async fn schedule_job_later<J: InsertableJob>(
349 &mut self,
350 rng: &mut (dyn RngCore + Send),
351 clock: &dyn Clock,
352 job: J,
353 scheduled_at: DateTime<Utc>,
354 ) -> Result<(), Self::Error> {
355 // Grab the span context from the current span
356 let span = tracing::Span::current();
357 let ctx = span.context();
358 let span = ctx.span();
359 let span_context = span.span_context();
360
361 let metadata = JobMetadata::new(span_context);
362 let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
363
364 let payload = serde_json::to_value(job).expect("Could not serialize job");
365 self.schedule_later(
366 rng,
367 clock,
368 J::QUEUE_NAME,
369 payload,
370 metadata,
371 scheduled_at,
372 None,
373 )
374 .await
375 }
376}