找回密码
 立即注册
首页 业界区 安全 基于 Postgres 实现一个 Job Queue

基于 Postgres 实现一个 Job Queue

赵淳美 2025-6-20 19:54:04
今天看到一篇赞美 Postgres 的文章:Postgres is Too Good (And Why That's Actually a Problem) ,显然是非常吸引眼球的,作者用 PG 实现了需要用到的所有微服务。
做 AFFiNE 我们是用 Redis 的 pub/sub 实现的 Job Queue,所以想参考一下用 Postgres 实现对比看看。
直接问 ChatGPT 怎样实现

基于 Postgres 的 LISTEN/NOTIFY 实现任务队列服务
https://chatgpt.com/share/685527b8-724c-8010-9e5a-1aac521a7acc
表结构

init.sql
  1. CREATE TABLE tasks (
  2.   "id" SERIAL PRIMARY KEY,
  3.   "type" TEXT NOT NULL,
  4.   "payload" JSONB,
  5.   "status" TEXT NOT NULL DEFAULT 'pending',
  6.   "created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
  7.   "updated_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
  8.   "processed_at" TIMESTAMPTZ(3)
  9. );
复制代码
Job Queue 基本操作

添加任务并广播通知
  1. INSERT INTO tasks (type, payload) VALUES ('send_email', '{"to": "test@example.com"}');
  2. NOTIFY task_queue, 'new_task';
复制代码
监听任务消息并拉取任务

监听任务消息
  1. await jobListener.query('LISTEN task_queue');
  2. jobListener.on('notification', async (msg) => {
  3.   if (msg.channel === 'task_queue') {
  4.     // 在这里拉取任务
  5.   }
  6. });
复制代码
拉取任务 SQL
  1. UPDATE tasks
  2. SET status = 'processing', processed_at = NOW(), updated_at = NOW()
  3. WHERE id = (
  4.   SELECT id FROM tasks WHERE status = 'pending'
  5.   ORDER BY created_at ASC LIMIT 1
  6.   FOR UPDATE SKIP LOCKED
  7. )
  8. RETURNING *;
复制代码
可运行的代码

先启动一个 Postgres 测试服务
  1. docker run --name pg-container-job-queue-demo \
  2.   -e POSTGRES_USER=postgres \
  3.   -e POSTGRES_PASSWORD=test123123 \
  4.   -e POSTGRES_DB=mydb \
  5.   -v $(pwd)/init.sql:/docker-entrypoint-initdb.d/init.sql \
  6.   -p 5432:5432 \
  7.   -d postgres:16
复制代码
启动 demo.ts 代码,每 5 秒触发一个任务,并打印结果
  1. node demo.ts
复制代码
完整代码

请移步 https://github.com/fengmk2/fengmk2.github.com/tree/master/blog/2025/job-queue-on-postgres
有爱

希望本文对你有用 _
原始文章地址:https://fengmk2.com/blog/2025/job-queue-on-postgres/README.html

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册