赵淳美 发表于 2025-6-20 19:54:04

基于 Postgres 实现一个 Job Queue

今天看到一篇赞美 Postgres 的文章:Postgres is Too Good (And Why That's Actually a Problem) ,显然是非常吸引眼球的,作者用 PG 实现了需要用到的所有微服务。
做 AFFiNE 我们是用 Redis 的 pub/sub 实现的 Job Queue,所以想参考一下用 Postgres 实现对比看看。
直接问 ChatGPT 怎样实现

基于 Postgres 的 LISTEN/NOTIFY 实现任务队列服务
https://chatgpt.com/share/685527b8-724c-8010-9e5a-1aac521a7acc
表结构

init.sql
CREATE TABLE tasks (
"id" SERIAL PRIMARY KEY,
"type" TEXT NOT NULL,
"payload" JSONB,
"status" TEXT NOT NULL DEFAULT 'pending',
"created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"processed_at" TIMESTAMPTZ(3)
);Job Queue 基本操作

添加任务并广播通知

INSERT INTO tasks (type, payload) VALUES ('send_email', '{"to": "test@example.com"}');

NOTIFY task_queue, 'new_task';监听任务消息并拉取任务

监听任务消息
await jobListener.query('LISTEN task_queue');

jobListener.on('notification', async (msg) => {
if (msg.channel === 'task_queue') {
    // 在这里拉取任务
}
});拉取任务 SQL
UPDATE tasks
SET status = 'processing', processed_at = NOW(), updated_at = NOW()
WHERE id = (
SELECT id FROM tasks WHERE status = 'pending'
ORDER BY created_at ASC LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *;可运行的代码

先启动一个 Postgres 测试服务
docker run --name pg-container-job-queue-demo \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=test123123 \
-e POSTGRES_DB=mydb \
-v $(pwd)/init.sql:/docker-entrypoint-initdb.d/init.sql \
-p 5432:5432 \
-d postgres:16启动 demo.ts 代码,每 5 秒触发一个任务,并打印结果
node demo.ts完整代码

请移步 https://github.com/fengmk2/fengmk2.github.com/tree/master/blog/2025/job-queue-on-postgres
有爱

希望本文对你有用 _
原始文章地址:https://fengmk2.com/blog/2025/job-queue-on-postgres/README.html

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 基于 Postgres 实现一个 Job Queue