跳到主要内容 十分钟实战:使用 Supabase 构建商业级 AI 图像生成平台后端 | 极客日志
TypeScript Node.js SaaS AI
十分钟实战:使用 Supabase 构建商业级 AI 图像生成平台后端 如何使用 Supabase 快速构建商业级 AI 图像生成平台的后端基础设施。内容涵盖数据库设计、行级安全策略配置、用户认证系统、积分管理函数与视图创建、实时功能实现以及 API 路由和 Webhook 处理器的开发。通过结合 Next.js 与 Supabase,实现了安全的用户数据隔离、积分自动赠送、图像生成任务追踪及支付集成流程,为独立开发者提供了一套完整的商业化应用落地方案。
Elasticer 发布于 2026/4/6 更新于 2026/4/16 11 浏览🚀前言
前端界面构建完毕后,接下来需要为其提供后端支持。利用 Supabase,可声明式地快速搭建应用所需的数据、认证、存储和实时通信系统。以下是实现思路与关键细节。
🔎1.数据库设计
结合系统设计文档,规划出如表所示的核心数据表。
表名 主要字段 描述与关系 user_profiles id, user_id, name, avatar_url, preferences, created_at, updated_at 存储用户的详细资料和偏好设置,与 Supabase 内置的 auth.users 表关联 generation_tasks id, user_id, task_type, prompt, negative_prompt, status, replicate_id, cost_credits, created_at 记录用户每一次发起的生成请求及当前状态(pending, processing, completed, failed) user_images id, user_id, task_id, image_url, thumbnail_url, metadata, created_at 存储成功生成的最终图像文件及其元数据,并与对应的生成请求关联 credit_transactions id, user_id, amount, type, description, related_id, created_at 记录所有积分变动(消费、充值、赠送等),便于审计和用户查询 orders id, user_id, stripe_session_id, amount, status, credits_amount, created_at, updated_at 存储用户购买积分的订单信息,与 Stripe 支付系统集成 stripe_subscriptions id, user_id, stripe_subscription_id, status, current_period_start, current_period_end, created_at 管理用户订阅状态,支持定期自动充值
这种将'生成请求'(generation_tasks 表)与'最终成果'(user_images 表)分离的设计,是一种健壮的数据建模方式。它能追踪每一次尝试(无论成败),这对后续调试和用户行为分析至关重要。
🔎2.用户认证系统
在 Supabase 仪表盘中点击 Authentication → Sign In/Providers,在右侧界面的 Auth Provider 区域启用 Email,也可以根据需求一键开启 Google、GitHub 等社交账户登录方式。
随后为所有表启用并配置行级安全(RLS)策略——这是 Supabase 的核心安全特性,能将数据访问控制下沉到数据库层面。我们添加的策略为'用户只能查看和操作 user_id 与自身认证 uid 匹配的记录'。这样一来,即使用户绕过前端直接调用 API,数据库也能阻止其窥探或篡改他人数据。
🔎3.数据库视图和函数
为了简化前端的数据获取,我们创建了几个有用的数据库视图:
user_stats_view:显示用户的生成统计、积分余额等汇总信息。
monthly_stats_view:按月份统计用户的生成数量和积分消费。
credit_consumption_view:分析用户的积分消费模式。
generation_type_distribution_view:统计不同生成类型的分布。
同时,创建了数据库函数来处理复杂的业务逻辑,核心函数如下:
CREATE OR REPLACE FUNCTION update_user_credits(
p_user_id UUID,
p_amount ,
p_type TEXT,
p_description TEXT,
p_related_id UUID
) void $$
credit_transactions(
user_id, amount, type, description, related_id
) (
p_user_id, p_amount, p_type, p_description, p_related_id
);
RAISE NOTICE , p_user_id,
p_amount p_amount, p_type, p_description;
;
$$ plpgsql SECURITY DEFINER;
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
INTEGER
DEFAULT
NULL
RETURNS
AS
BEGIN
INSERT INTO
VALUES
'User % credits updated: % (%) - %'
CASE
WHEN
>
0
THEN
'+'
ELSE
''
END
||
END
LANGUAGE
该函数可以安全地处理积分变动(消费、充值、赠送等),并自动记录交易历史。
🔎4.实时功能 为了在前端实时反馈耗时较长的图像生成进度,我们将借助 Supabase Realtime 的实时数据同步能力。前端的'生成工作台'页面通过 Supabase 客户端库,订阅 generation_tasks 表中当前用户的记录变更。当后端的 Webhook 处理器更新了某个任务的 status 字段时,这个变化会通过 WebSocket 实时推送到前端。
🔎5.实施步骤 现在,我们将上述设计转化为实际的代码和配置。以下是详细的实施步骤。
🦋5.1 数据库表结构创建与迁移 首先对现有数据库结构进行全面分析,这可以通过 Supabase 控制台或 psql 工具来完成。这样做的目的是查看当前的表结构,明确需要新增哪些表以及需要调整哪些字段。完成分析后,在 Supabase 控制台的 SQL 编辑器中执行以下 SQL 语句,创建所需的表:
CREATE TABLE user_profiles (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_id uuid NOT NULL REFERENCES auth.users(id),
name text,
avatar_url text,
preferences jsonb,
created_at timestamp with time zone DEFAULT now(),
updated_at timestamp with time zone DEFAULT now()
);
CREATE TABLE generation_tasks (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_id uuid NOT NULL REFERENCES auth.users(id),
task_type text NOT NULL ,
prompt text NOT NULL ,
negative_prompt text,
status text DEFAULT 'pending' ,
replicate_id text,
cost_credits integer DEFAULT 1 ,
created_at timestamp with time zone DEFAULT now()
);
CREATE TABLE user_images (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_id uuid NOT NULL REFERENCES auth.users(id),
task_id uuid REFERENCES generation_tasks(id),
image_url text NOT NULL ,
thumbnail_url text,
metadata jsonb,
created_at timestamp with time zone DEFAULT now()
);
CREATE TABLE credit_transactions (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_id uuid NOT NULL REFERENCES auth.users(id),
amount integer NOT NULL ,
type text NOT NULL ,
description text,
related_id uuid,
created_at timestamp with time zone DEFAULT now()
);
CREATE TABLE orders (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_id uuid NOT NULL REFERENCES auth.users(id),
stripe_session_id text,
amount integer NOT NULL ,
status text DEFAULT 'pending' ,
credits_amount integer NOT NULL ,
created_at timestamp with time zone DEFAULT now(),
updated_at timestamp with time zone DEFAULT now()
);
CREATE TABLE stripe_subscriptions (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_id uuid NOT NULL REFERENCES auth.users(id),
stripe_subscription_id text NOT NULL ,
status text DEFAULT 'active' ,
current_period_start timestamp with time zone,
current_period_end timestamp with time zone,
created_at timestamp with time zone DEFAULT now()
);
接下来需要启用行级安全(RLS)策略,并创建相应的安全策略,具体操作如下:
ALTER TABLE user_profiles ENABLE ROW LEVEL SECURITY;
ALTER TABLE generation_tasks ENABLE ROW LEVEL SECURITY;
ALTER TABLE user_images ENABLE ROW LEVEL SECURITY;
ALTER TABLE credit_transactions ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE stripe_subscriptions ENABLE ROW LEVEL SECURITY;
CREATE POLICY "Users can view own profile" ON user_profiles FOR SELECT USING (auth.uid() = user_id);
CREATE POLICY "Users can update own profile" ON user_profiles FOR UPDATE USING (auth.uid() = user_id);
CREATE POLICY "Users can view own tasks" ON generation_tasks FOR ALL USING (auth.uid() = user_id);
CREATE POLICY "Users can view own images" ON user_images FOR ALL USING (auth.uid() = user_id);
CREATE POLICY "Users can view own transactions" ON credit_transactions FOR ALL USING (auth.uid() = user_id);
CREATE POLICY "Users can view own orders" ON orders FOR ALL USING (auth.uid() = user_id);
CREATE POLICY "Users can view own subscriptions" ON stripe_subscriptions FOR ALL USING (auth.uid() = user_id);
🦋5.2 创建数据库视图和函数 为了更高效地管理和查询数据,需要创建相应的数据库视图和函数。首先是用户统计视图的创建,通过以下 SQL 语句可以生成一个整合了用户基本信息与相关统计数据的视图:
CREATE VIEW user_stats_view AS
SELECT
u.id as user_id,
u.email,
up.name,
COUNT (DISTINCT gt.id) as total_generations,
COUNT (DISTINCT ui.id) as total_images,
COALESCE (SUM (ct.amount), 0 ) as credit_balance
FROM auth.users u
LEFT JOIN user_profiles up ON u.id = up.user_id
LEFT JOIN generation_tasks gt ON u.id = gt.user_id
LEFT JOIN user_images ui ON u.id = ui.user_id
LEFT JOIN credit_transactions ct ON u.id = ct.user_id
GROUP BY u.id, u.email, up.name;
接下来需要创建积分管理相关的函数,以实现积分的更新和查询功能。首先是更新用户积分的函数,该函数会插入积分交易记录并记录操作日志,具体代码如下:
CREATE OR REPLACE FUNCTION update_user_credits(
p_user_id UUID,
p_amount INTEGER ,
p_type TEXT,
p_description TEXT,
p_related_id UUID DEFAULT NULL
) RETURNS void AS $$
BEGIN
INSERT INTO credit_transactions(
user_id, amount, type, description, related_id
) VALUES (
p_user_id, p_amount, p_type, p_description, p_related_id
);
RAISE NOTICE 'User % credits updated: % (%) - %' , p_user_id,
CASE WHEN p_amount > 0 THEN '+' ELSE '' END || p_amount, p_type, p_description;
END ;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE OR REPLACE FUNCTION get_user_credit_balance(p_user_id UUID) RETURNS INTEGER AS $$
DECLARE balance INTEGER ;
BEGIN
SELECT COALESCE (SUM (amount), 0 ) INTO balance FROM credit_transactions WHERE user_id = p_user_id;
RETURN balance;
END ;
$$ LANGUAGE plpgsql SECURITY DEFINER;
🦋5.3 配置 Supabase Storage Supabase Storage 的配置主要包含存储桶的创建与存储策略的设置两部分。
创建存储桶 :在 Supabase 控制台的 Storage 页面,点击 Create a new bucket 按钮,输入桶名称 user-images,在权限设置中选择 Public(允许公开读取存储桶内的文件),最后点击 Create bucket 完成创建。
设置存储策略 :在存储桶创建完成后,需配置对应的存储策略以保障用户数据的访问安全与权限隔离。在 Supabase 控制台的 SQL 编辑器中执行以下 SQL 语句,可实现两项核心策略:允许用户仅能上传文件到以自身 ID 命名的专属文件夹;允许用户仅能查看自身文件夹内的文件。
CREATE POLICY "Users can upload to own folder" ON storage.objects FOR INSERT WITH CHECK (
bucket_id = 'user-images' AND (storage.foldername(name))[1 ] = auth.uid()::text
);
CREATE POLICY "Users can view own files" ON storage.objects FOR SELECT USING (
bucket_id = 'user-images' AND (storage.foldername(name))[1 ] = auth.uid()::text
);
🦋5.4 新用户自动赠送积分 要实现新用户自动赠送积分功能,需先创建触发器函数及对应的触发器。通过以下 SQL 创建 handle_new_user 函数,该函数会在新用户注册时,向 user_profiles 表插入用户资料(关联新用户 ID、提取元数据中的用户名、初始化偏好设置为空值),同时调用 update_user_credits 函数向注册的新用户赠送 10 个积分;再创建 on_auth_user_created 触发器,使其在 auth.users 表插入新记录后,自动执行该函数。具体的 SQL 代码如下:
CREATE OR REPLACE FUNCTION handle_new_user() RETURNS trigger AS $$
BEGIN
INSERT INTO user_profiles(user_id, name, preferences)
VALUES (NEW.id, NEW.raw_user_meta_data- >> 'full_name' , '{}' );
PERFORM update_user_credits(
NEW.id, 10 , 'bonus' , '新用户注册奖励'
);
RETURN NEW ;
END ;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE TRIGGER on_auth_user_created AFTER INSERT ON auth.users FOR EACH ROW EXECUTE FUNCTION handle_new_user();
🦋5.5 测试数据库功能 测试数据库功能需重点验证积分管理函数与用户统计视图的可用性,具体测试逻辑如下:先调用 update_user_credits 函数(替换实际用户的 ID)测试积分更新功能是否正常;随后通过 get_user_credit_balance 函数查询积分余额,验证积分变动是否准确;最后借助 user_stats_view 视图查看用户的生成任务数、图片总数及当前积分余额等统计数据,确保数据汇总逻辑无误。具体的 SQL 代码如下:
SELECT update_user_credits('your-user-id-here' , 5 , 'test' , '测试积分更新' );
SELECT get_user_credit_balance('your-user-id-here' );
SELECT * FROM user_stats_view WHERE user_id = 'your-user-id-here' ;
通过以上步骤,我们成功创建了完整的数据库基础设施。接下来实现 API 路由和前端集成。
🦋5.6 创建图像生成 API 路由 在 app/api/generate/route.ts 中实现图像生成 API 路由。该路由通过 TypeScript 代码完成用户身份验证、解析请求参数(校验提示词)、检查积分余额与生成频率限制;随后调用 Replicate API(配置模型参数,支持图生图),在 generation_tasks 表记录任务;最后返回任务信息或错误提示。具体代码如下:
import { NextRequest , NextResponse } from "next/server" ;
import { createRouteHandlerClient } from "@supabase/auth-helpers-nextjs" ;
import { cookies } from "next/headers" ;
import Replicate from "replicate" ;
const replicate = new Replicate ({ auth : process.env .REPLICATE_API_TOKEN });
export async function POST (request : NextRequest ) {
try {
const supabase = createRouteHandlerClient ({ cookies });
const { data : { user }, error : authError } = await supabase.auth .getUser ();
if (authError || !user) {
return NextResponse .json ({ error : "未授权访问" }, { status : 401 });
}
const { prompt, negativePrompt, taskType, imageUrls } = await request.json ();
if (!prompt) {
return NextResponse .json ({ error : "提示词不能为空" }, { status : 400 });
}
const { data : balance } = await supabase.rpc ("get_user_credit_balance" , { p_user_id : user.id });
if ((balance || 0 ) <= 0 ) {
return NextResponse .json ({ error : "积分余额不足" }, { status : 402 });
}
const { data : rateLimitCheck } = await supabase.rpc ("check_user_rate_limit" , { p_user_id : user.id , p_max_daily_generations : 50 });
if (!rateLimitCheck) {
return NextResponse .json ({ error : "今日生成次数已达上限,请明天再试" }, { status : 429 });
}
const modelInput : any = {
prompt : prompt,
negative_prompt : negativePrompt || "" ,
width : 1024 ,
height : 1024 ,
num_outputs : 1 ,
scheduler : "K_EULER" ,
num_inference_steps : 50 ,
guidance_scale : 7.5 ,
seed : Math .floor (Math .random () * 1000000 ),
};
if (imageUrls && imageUrls.length > 0 ) {
modelInput.image = imageUrls[0 ];
}
const output = await replicate.run ("stability-ai/sdx1:39ed52f2a78e934b3ba6e2a89f5b1c712de7dfea5355255b1aa35c5565e08b" , {
input : modelInput,
webhook : `${process.env.NEXT_PUBLIC_APP_URL} /api/webhook/replicate` ,
webhook_events_filter : ["completed" ],
});
const { data : task, error : taskError } = await supabase
.from ("generation_tasks" )
.insert ({
user_id : user.id ,
task_type : taskType || "text_to_image" ,
prompt : prompt,
negative_prompt : negativePrompt,
status : "processing" ,
replicate_id : output.id ,
cost_credits : 1 ,
})
.select ()
.single ();
if (taskError) {
console .error ("创建任务失败:" , taskError);
return NextResponse .json ({ error : "创建任务失败" }, { status : 500 });
}
return NextResponse .json ({
success : true ,
task_id : task.id ,
replicate_id : output.id ,
message : "任务已创建,正在处理中..." ,
});
} catch (error) {
console .error ("生成请求失败:" , error);
return NextResponse .json ({ error : "服务器内部错误" }, { status : 500 });
}
}
🦋5.7 创建 Webhook 处理器 在 app/api/webhook/replicate/route.ts 中实现 Webhook 处理器,通过 TypeScript 代码解析 Replicate 的任务通知,查找对应的本地任务:任务成功则下载图片并上传至 Supabase Storage,记录图片信息、更新任务状态并扣除积分;任务失败则更新任务状态为失败,同时返回对应的处理结果或错误信息(生产环境需补充请求来源验证)。具体代码如下:
import { NextRequest , NextResponse } from "next/server" ;
import { createRouteHandlerClient } from "@supabase/auth-helpers-nextjs" ;
import { cookies } from "next/headers" ;
export async function POST (request : NextRequest ) {
try {
const webhookData = await request.json ();
const { id : replicateId, status, output, error } = webhookData;
if (!replicateId) {
return NextResponse .json ({ error : "缺少 replicate_id" }, { status : 400 });
}
const supabase = createRouteHandlerClient ({ cookies });
const { data : task, error : taskError } = await supabase
.from ("generation_tasks" )
.select ("*" )
.eq ("replicate_id" , replicateId)
.single ();
if (taskError || !task) {
console .error ("找不到对应任务:" , replicateId);
return NextResponse .json ({ error : "任务不存在" }, { status : 404 });
}
if (status === "succeeded" && output && output.length > 0 ) {
const imageUrl = output[0 ];
const imageResponse = await fetch (imageUrl);
const imageBlob = await imageResponse.blob ();
const fileName = `${task.user_id} /${Date .now()} _${Math .random().toString(36 ).substr(2 , 9 )} .png` ;
const { data : uploadData, error : uploadError } = await supabase.storage
.from ("user-images" )
.upload (fileName, imageBlob, {
contentType : "image/png" ,
cacheControl : "3600" ,
});
if (uploadError) {
console .error ("图片上传失败:" , uploadError);
await supabase
.from ("generation_tasks" )
.update ({ status : "failed" })
.eq ("id" , task.id );
return NextResponse .json ({ error : "图片上传失败" }, { status : 500 });
}
const { data : { publicUrl } } = supabase.storage .from ("user-images" ).getPublicUrl (fileName);
const { error : imageError } = await supabase.from ("user_images" ).insert ({
user_id : task.user_id ,
task_id : task.id ,
image_url : publicUrl,
metadata : {
original_url : imageUrl,
file_name : fileName,
file_size : imageBlob.size ,
mime_type : imageBlob.type ,
},
});
if (imageError) {
console .error ("创建图片记录失败:" , imageError);
}
await supabase
.from ("generation_tasks" )
.update ({ status : "completed" })
.eq ("id" , task.id );
const { error : creditError } = await supabase.rpc ("consume_user_credits" , {
p_user_id : task.user_id ,
p_credits_to_consume : task.cost_credits ,
p_description : `图像生成:${task.task_type} ` ,
p_related_id : task.id ,
});
if (creditError) {
console .error ("扣除积分失败:" , creditError);
}
return NextResponse .json ({
success : true ,
message : "任务处理完成" ,
image_url : publicUrl,
});
} else {
await supabase
.from ("generation_tasks" )
.update ({
status : "failed" ,
})
.eq ("id" , task.id );
return NextResponse .json ({
success : false ,
message : "任务处理失败" ,
error : error || "未知错误" ,
});
}
} catch (error) {
console .error ("Webhook 处理失败:" , error);
return NextResponse .json ({ error : "服务器内部错误" }, { status : 500 });
}
}
🦋5.8 配置环境变量和安装依赖 配置环境变量和安装依赖是项目运行的基础准备工作。首先在项目根目录创建 .env.local 文件,用于存储各类环境变量,包括 Supabase 的项目 URL、匿名密钥和服务角色密钥、Replicate 的 API 令牌及 Webhook 签名密钥,以及应用的基础 URL 等配置信息。具体内容如下:
NEXT_PUBLIC_SUPABASE_URL=your_supabase_project_url
NEXT_PUBLIC_SUPABASE_ANON_KEY=your_supabase_anon_key
SUPABASE_SERVICE_ROLE_KEY=your_supabase_service_role_key
REPLICATE_API_TOKEN=your_replicate_api_token
REPLICATE_WEBHOOK_SIGNING_SECRET=your_webhook_secret
NEXT_PUBLIC_APP_URL=http://localhost:3000
接着需要安装项目所需的必要依赖包,通过执行以下 Bash 命令,可安装 replicate、@supabase/auth-helpers-nextjs 和 @supabase/supabase-js 等包,为项目提供相应的功能支持:
npm install replicate @supabase/auth-helpers-nextjs @supabase/supabase-js
🦋5.9 测试 API 功能 API 功能测试需验证生成请求和 Webhook 是否正常工作。测试生成请求时,可使用 Postman 或 curl 工具,向 http://localhost:3000/api/generate 发送 POST 请求,在请求头中包含 Content-Type 和 Authorization 信息,在请求体中传入提示词和任务类型等参数。curl 命令示例如下:
curl -X POST http://localhost:3000/api/generate \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-d '{ "prompt": "一只可爱的小猫坐在花园里", "taskType": "text_to_image" }'
要测试 Webhook,则需在 Replicate 控制台中配置 Webhook URL 为 https://your-domain.com/api/webhook/replicate,以确保任务完成后能正确接收通知。
通过以上完整的实施步骤,我们成功构建了一个功能完备的后端系统,包括完整的数据库基础设施、安全的积分管理系统、高效的图像生成 API、可靠的异步任务处理以及完善的错误处理和日志记录。