Drizzle ORM 完全指南:从入门到精通
前言
在现代 TypeScript 开发中,如何优雅地处理数据库操作一直是个重要话题。今天我要介绍的 Drizzle ORM,是一个新兴但极具潜力的解决方案。它不仅提供了完整的类型安全,还保持了接近原生 SQL 的性能和灵活性。
什么是 Drizzle?
Drizzle 是一个 TypeScript 优先、轻量级、高性能 的 ORM。与其说它是 ORM(对象关系映射),不如说它是一个"类型安全的 SQL 查询构建器"。
核心特点
- 🚀 零运行时开销:查询在构建时编译成 SQL
- 🔒 100% 类型安全:从模式定义到查询结果
- 📦 轻量级:核心包仅 ~50KB
- 🎯 SQL-like 语法:学习成本低
- 🔧 灵活性高:支持复杂查询和原生 SQL
为什么选择 Drizzle?
1. 类型安全的极致体验
typescript
// 定义模式时,TypeScript 会记住每个字段
const users = pgTable('users', {
id: serial('id').primaryKey(),
name: text('name').notNull(),
email: text('email').notNull().unique(),
age: integer('age'),
createdAt: timestamp('created_at').defaultNow()
})
// 查询时,IDE 会提供完整的自动补全
const result = await db
.select()
.from(users)
.where(eq(users.email, 'test@example.com')) // ✅ email 字段存在
// .where(eq(users.emial, '...')) // ❌ 拼写错误会立即报错
2. 性能对比
typescript
// Drizzle - 直接生成 SQL,无额外开销
const users = await db.select().from(users).where(eq(users.age, 18))
// 生成: SELECT * FROM users WHERE age = $1
// Prisma - 需要经过 ORM 层处理
const users = await prisma.user.findMany({ where: { age: 18 } })
// 需要解析、转换、验证等步骤
快速开始
安装
bash
# 安装核心包
bun add drizzle-orm postgres
bun add -d drizzle-kit @types/pg
# 或使用 npm/yarn/pnpm
npm install drizzle-orm postgres
npm install -D drizzle-kit @types/pg
基础配置
- 创建配置文件
drizzle.config.ts
:
typescript
import { defineConfig } from 'drizzle-kit'
export default defineConfig({
schema: './src/db/schema.ts',
out: './drizzle',
dialect: 'postgresql',
dbCredentials: {
url: process.env.DATABASE_URL!,
},
})
- 设置数据库连接
src/db/index.ts
:
typescript
import { drizzle } from 'drizzle-orm/postgres-js'
import postgres from 'postgres'
const connectionString = process.env.DATABASE_URL!
const client = postgres(connectionString)
export const db = drizzle(client)
核心概念详解
1. Schema 定义
typescript
import { pgTable, serial, text, integer, timestamp, boolean, uuid, decimal, jsonb } from 'drizzle-orm/pg-core'
// 用户表
export const users = pgTable('users', {
id: uuid('id').primaryKey().defaultRandom(),
email: text('email').notNull().unique(),
name: text('name').notNull(),
age: integer('age'),
isActive: boolean('is_active').default(true),
metadata: jsonb('metadata'),
createdAt: timestamp('created_at').defaultNow(),
updatedAt: timestamp('updated_at').defaultNow(),
})
// 订单表
export const orders = pgTable('orders', {
id: serial('id').primaryKey(),
userId: uuid('user_id').notNull().references(() => users.id),
amount: decimal('amount', { precision: 10, scale: 2 }).notNull(),
status: text('status', { enum: ['pending', 'completed', 'cancelled'] }).notNull(),
createdAt: timestamp('created_at').defaultNow(),
})
// 多对多关系 - 用户角色表
export const usersToRoles = pgTable('users_to_roles', {
userId: uuid('user_id').notNull().references(() => users.id),
roleId: integer('role_id').notNull().references(() => roles.id),
}, (table) => ({
pk: primaryKey(table.userId, table.roleId), // 复合主键
}))
2. 基础查询操作
查询数据
typescript
// 1. 查询所有数据
const allUsers = await db.select().from(users)
// 2. 条件查询
const activeUsers = await db
.select()
.from(users)
.where(eq(users.isActive, true))
// 3. 多条件查询
const specificUsers = await db
.select()
.from(users)
.where(
and(
eq(users.isActive, true),
gte(users.age, 18),
like(users.email, '%@gmail.com')
)
)
// 4. 排序和分页
const paginatedUsers = await db
.select()
.from(users)
.orderBy(desc(users.createdAt))
.limit(10)
.offset(20)
// 5. 选择特定字段
const userEmails = await db
.select({
id: users.id,
email: users.email,
fullName: users.name, // 可以重命名
})
.from(users)
插入数据
typescript
// 1. 插入单条数据
const newUser = await db
.insert(users)
.values({
email: 'john@example.com',
name: 'John Doe',
age: 25,
})
.returning() // 返回插入的数据
// 2. 批量插入
const newUsers = await db
.insert(users)
.values([
{ email: 'user1@example.com', name: 'User 1' },
{ email: 'user2@example.com', name: 'User 2' },
{ email: 'user3@example.com', name: 'User 3' },
])
.returning()
// 3. 冲突处理
const upsertUser = await db
.insert(users)
.values({
email: 'existing@example.com',
name: 'Updated Name',
})
.onConflictDoUpdate({
target: users.email,
set: { name: 'Updated Name', updatedAt: new Date() },
})
更新数据
typescript
// 1. 更新单个字段
const updated = await db
.update(users)
.set({ isActive: false })
.where(eq(users.id, userId))
.returning()
// 2. 更新多个字段
const updatedUser = await db
.update(users)
.set({
name: 'New Name',
age: sql`${users.age} + 1`, // 使用 SQL 表达式
updatedAt: new Date(),
})
.where(eq(users.email, 'john@example.com'))
// 3. 条件更新
const deactivateOldUsers = await db
.update(users)
.set({ isActive: false })
.where(
and(
lt(users.createdAt, new Date('2023-01-01')),
eq(users.isActive, true)
)
)
删除数据
typescript
// 1. 删除特定记录
const deleted = await db
.delete(users)
.where(eq(users.id, userId))
.returning()
// 2. 批量删除
const deletedInactive = await db
.delete(users)
.where(eq(users.isActive, false))
3. 高级查询
联表查询
typescript
// 1. Inner Join
const usersWithOrders = await db
.select({
userName: users.name,
userEmail: users.email,
orderId: orders.id,
orderAmount: orders.amount,
})
.from(users)
.innerJoin(orders, eq(users.id, orders.userId))
// 2. Left Join
const usersWithOptionalOrders = await db
.select()
.from(users)
.leftJoin(orders, eq(users.id, orders.userId))
// 3. 多表联接
const complexQuery = await db
.select({
user: users,
order: orders,
role: roles,
})
.from(users)
.leftJoin(orders, eq(users.id, orders.userId))
.leftJoin(usersToRoles, eq(users.id, usersToRoles.userId))
.leftJoin(roles, eq(usersToRoles.roleId, roles.id))
.where(eq(users.isActive, true))
聚合查询
typescript
// 1. 计数
const userCount = await db
.select({ count: count() })
.from(users)
// 2. 分组统计
const orderStats = await db
.select({
userId: orders.userId,
totalAmount: sum(orders.amount),
orderCount: count(),
avgAmount: avg(orders.amount),
})
.from(orders)
.groupBy(orders.userId)
.having(gt(count(), 5)) // Having 子句
// 3. 复杂聚合
const monthlyRevenue = await db
.select({
month: sql<string>`DATE_TRUNC('month', ${orders.createdAt})`,
revenue: sum(orders.amount),
orderCount: count(),
})
.from(orders)
.where(eq(orders.status, 'completed'))
.groupBy(sql`DATE_TRUNC('month', ${orders.createdAt})`)
.orderBy(sql`DATE_TRUNC('month', ${orders.createdAt})`)
子查询
typescript
// 1. 在 WHERE 中使用子查询
const activeUsersWithOrders = await db
.select()
.from(users)
.where(
inArray(
users.id,
db.select({ id: orders.userId }).from(orders).where(eq(orders.status, 'completed'))
)
)
// 2. 在 SELECT 中使用子查询
const usersWithOrderCount = await db
.select({
...users,
orderCount: sql<number>`(
SELECT COUNT(*)
FROM ${orders}
WHERE ${orders.userId} = ${users.id}
)`,
})
.from(users)
4. 事务处理
typescript
// 1. 基础事务
const result = await db.transaction(async (tx) => {
// 创建用户
const [user] = await tx
.insert(users)
.values({ email: 'new@example.com', name: 'New User' })
.returning()
// 创建订单
const [order] = await tx
.insert(orders)
.values({ userId: user.id, amount: '99.99', status: 'pending' })
.returning()
return { user, order }
})
// 2. 带回滚的事务
async function transferMoney(fromUserId: string, toUserId: string, amount: number) {
try {
await db.transaction(async (tx) => {
// 扣除发送方余额
const [sender] = await tx
.update(users)
.set({ balance: sql`balance - ${amount}` })
.where(eq(users.id, fromUserId))
.returning()
if (sender.balance < 0) {
throw new Error('余额不足')
}
// 增加接收方余额
await tx
.update(users)
.set({ balance: sql`balance + ${amount}` })
.where(eq(users.id, toUserId))
// 记录交易
await tx.insert(transactions).values({
fromUserId,
toUserId,
amount,
type: 'transfer',
})
})
} catch (error) {
console.error('转账失败:', error)
throw error
}
}
5. 性能优化
Prepared Statements
typescript
// 1. 创建预编译语句
const getUserById = db
.select()
.from(users)
.where(eq(users.id, sql.placeholder('userId')))
.prepare('getUserById')
const getOrdersByStatus = db
.select()
.from(orders)
.where(eq(orders.status, sql.placeholder('status')))
.limit(sql.placeholder('limit'))
.prepare('getOrdersByStatus')
// 2. 复用预编译语句
const user1 = await getUserById.execute({ userId: 'uuid-1' })
const user2 = await getUserById.execute({ userId: 'uuid-2' })
const pendingOrders = await getOrdersByStatus.execute({
status: 'pending',
limit: 10
})
批量操作优化
typescript
// 1. 使用事务批量插入
async function bulkInsertUsers(userData: NewUser[]) {
const BATCH_SIZE = 1000
await db.transaction(async (tx) => {
for (let i = 0; i < userData.length; i += BATCH_SIZE) {
const batch = userData.slice(i, i + BATCH_SIZE)
await tx.insert(users).values(batch)
}
})
}
// 2. 使用 COPY 命令(PostgreSQL)
async function superFastBulkInsert(data: any[]) {
const values = data.map(d => `(${d.id}, '${d.name}', '${d.email}')`).join(',')
await db.execute(sql`
INSERT INTO ${users} (id, name, email)
VALUES ${sql.raw(values)}
`)
}
实战技巧
1. 类型推断和复用
typescript
// 从 schema 推断类型
export type User = typeof users.$inferSelect // 查询结果类型
export type NewUser = typeof users.$inferInsert // 插入数据类型
export type UserId = User['id'] // 单个字段类型
// 创建可复用的查询片段
const activeUsersBase = db
.select()
.from(users)
.where(eq(users.isActive, true))
// 基于片段构建不同查询
const activeUserEmails = await activeUsersBase
.select({ email: users.email })
const activeUserCount = await activeUsersBase
.select({ count: count() })
2. 动态查询构建
typescript
interface UserFilters {
name?: string
email?: string
isActive?: boolean
minAge?: number
}
async function searchUsers(filters: UserFilters) {
const conditions = []
if (filters.name) {
conditions.push(like(users.name, `%${filters.name}%`))
}
if (filters.email) {
conditions.push(eq(users.email, filters.email))
}
if (filters.isActive !== undefined) {
conditions.push(eq(users.isActive, filters.isActive))
}
if (filters.minAge) {
conditions.push(gte(users.age, filters.minAge))
}
return db
.select()
.from(users)
.where(conditions.length > 0 ? and(...conditions) : undefined)
}
3. 自定义 SQL 函数
typescript
// 1. 创建自定义函数
const lower = (column: AnyColumn) => sql<string>`lower(${column})`
const coalesce = <T>(column: SQL<T>, defaultValue: T) =>
sql<T>`coalesce(${column}, ${defaultValue})`
// 2. 使用自定义函数
const searchResults = await db
.select()
.from(users)
.where(like(lower(users.email), '%gmail.com'))
const usersWithDefaults = await db
.select({
id: users.id,
name: coalesce(users.name, 'Anonymous'),
age: coalesce(users.age, 0),
})
.from(users)
4. 迁移管理
typescript
// package.json scripts
{
"scripts": {
"db:generate": "drizzle-kit generate",
"db:push": "drizzle-kit push",
"db:migrate": "drizzle-kit migrate",
"db:studio": "drizzle-kit studio",
"db:seed": "tsx src/db/seed.ts"
}
}
// src/db/seed.ts
import { db } from './index'
import { users, roles } from './schema'
async function seed() {
console.log('🌱 开始填充数据...')
// 创建角色
const [adminRole, userRole] = await db
.insert(roles)
.values([
{ name: 'admin', permissions: ['read', 'write', 'delete'] },
{ name: 'user', permissions: ['read'] },
])
.returning()
// 创建用户
await db.insert(users).values([
{
email: 'admin@example.com',
name: 'Admin User',
roleId: adminRole.id,
},
{
email: 'user@example.com',
name: 'Regular User',
roleId: userRole.id,
},
])
console.log('✅ 数据填充完成!')
}
seed().catch(console.error)
与其他技术集成
1. 与 tRPC 集成
typescript
// server/routers/user.ts
import { z } from 'zod'
import { router, publicProcedure } from '../trpc'
import { db } from '@/db'
import { users } from '@/db/schema'
import { eq } from 'drizzle-orm'
export const userRouter = router({
list: publicProcedure
.input(z.object({
limit: z.number().min(1).max(100).default(10),
offset: z.number().min(0).default(0),
}))
.query(async ({ input }) => {
const result = await db
.select()
.from(users)
.limit(input.limit)
.offset(input.offset)
return result
}),
create: publicProcedure
.input(z.object({
email: z.string().email(),
name: z.string().min(1),
}))
.mutation(async ({ input }) => {
const [user] = await db
.insert(users)
.values(input)
.returning()
return user
}),
})
2. 与 Next.js App Router 集成
typescript
// app/api/users/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { db } from '@/db'
import { users } from '@/db/schema'
export async function GET(request: NextRequest) {
const searchParams = request.nextUrl.searchParams
const limit = parseInt(searchParams.get('limit') || '10')
const offset = parseInt(searchParams.get('offset') || '0')
const result = await db
.select()
.from(users)
.limit(limit)
.offset(offset)
return NextResponse.json(result)
}
export async function POST(request: NextRequest) {
const body = await request.json()
const [user] = await db
.insert(users)
.values(body)
.returning()
return NextResponse.json(user, { status: 201 })
}
常见问题和解决方案
1. 连接池配置
typescript
// 开发环境
const client = postgres(connectionString, {
max: 1, // 开发环境使用单连接
})
// 生产环境
const client = postgres(connectionString, {
max: 20, // 最大连接数
idle_timeout: 30, // 空闲超时
connect_timeout: 10, // 连接超时
})
2. 处理大量数据
typescript
// 使用游标处理大量数据
async function* getUsers() {
const batchSize = 1000
let offset = 0
while (true) {
const batch = await db
.select()
.from(users)
.limit(batchSize)
.offset(offset)
if (batch.length === 0) break
yield batch
offset += batchSize
}
}
// 使用
for await (const batch of getUsers()) {
await processBatch(batch)
}
3. 处理时区
typescript
// 定义时区感知的时间戳
export const events = pgTable('events', {
id: serial('id').primaryKey(),
// 存储 UTC 时间
occurredAt: timestamp('occurred_at', { withTimezone: true }).notNull(),
// 本地时间展示
localTime: timestamp('local_time', { withTimezone: false }),
})
// 查询时转换时区
const eventsInLocalTime = await db
.select({
id: events.id,
occurredAt: sql<string>`${events.occurredAt} AT TIME ZONE 'Asia/Shanghai'`,
})
.from(events)
性能监控
typescript
// 添加查询日志
const db = drizzle(client, {
logger: {
logQuery(query, params) {
console.log('Query:', query)
console.log('Params:', params)
},
},
})
// 性能追踪
async function trackQuery<T>(
name: string,
queryFn: () => Promise<T>
): Promise<T> {
const start = performance.now()
try {
const result = await queryFn()
const duration = performance.now() - start
console.log(`Query "${name}" took ${duration.toFixed(2)}ms`)
return result
} catch (error) {
console.error(`Query "${name}" failed:`, error)
throw error
}
}
// 使用
const users = await trackQuery('getActiveUsers', () =>
db.select().from(users).where(eq(users.isActive, true))
)
总结
Drizzle ORM 提供了一种全新的数据库操作方式:
- 类型安全:从定义到查询,全程 TypeScript 保护
- 高性能:接近原生 SQL 的执行效率
- 灵活性:支持复杂查询和原生 SQL
- 开发体验:优秀的 IDE 支持和错误提示
- 轻量级:最小的运行时开销
无论你是在构建小型项目还是大型应用,Drizzle 都能提供可靠的数据库解决方案。它特别适合:
- 重视类型安全的 TypeScript 项目
- 需要复杂查询的应用
- 对性能有高要求的场景
- 希望保持 SQL 控制力的开发者
希望这篇指南能帮助你掌握 Drizzle ORM,在项目中发挥它的最大价值!
相关资源
如果你觉得这篇文章有帮助,请点赞支持!有任何问题欢迎在评论区讨论。