n8n 是一个强大的开源工作流自动化平台。其中 Code Node 是一个瑞士军刀级别的工具,允许你在工作流中嵌入自定义的 JavaScript 或 Python 代码,处理复杂的数据转换和业务逻辑。
如果标准的 n8n 节点无法满足你的需求,Code Node 就是你的解决方案。通过它,你可以:
Code Node 是 n8n 中的一个特殊节点,允许你在工作流中直接编写代码。它支持两种编程语言:
| 特性 | JavaScript | Python |
|---|---|---|
| 执行效率 | ⭐⭐⭐⭐⭐ 最快 | ⭐⭐⭐ 中等 |
| 适用场景 | 快速数据转换、API 调用 | 数据科学、复杂计算 |
| 学习曲线 | 相对简单 | 需要 Python 基础 |
| 推荐 | 初学者优选 | 有 Python 经验的用户 |
建议: 本教程重点使用 JavaScript,因为它的执行效率更高,且几乎所有 n8n 用户都能理解 JavaScript 的基础语法。
在 Code Node 中,你可以通过几种方式访问来自前一个节点的数据:
// 获取当前项的数据(仅当 Code Node 设置为逐项处理时)
const currentItem = $input.item.json;
// 获取所有数据项(适合需要汇总或比较的场景)
const allItems = $input.all();
// 获取指定索引的项
const firstItem = $items()[0].json;
// 获取项目总数
const totalCount = $input.all().length;Code Node 的返回值决定了传递给下一个节点的数据:
// 返回单个对象
return {
name: "张三",
email: "zhangsan@example.com"
};
// 返回数组(每个数组元素会成为一个独立项)
return [
{ id: 1, name: "项目A" },
{ id: 2, name: "项目B" }
];
// 返回原始输入并添加新字段
return {
...currentItem,
processedAt: new Date().toISOString(),
status: "completed"
};Code Node 提供了两种执行模式:
| 模式 | 说明 | 使用场景 |
|---|---|---|
| 逐项处理 (For Each Item) | 为每个输入项单独执行一次 | 数据转换、字段添加 |
| 一次全部处理 (Run Once for All) | 仅执行一次,处理所有项 | 数据聚合、统计、排序 |
// 示例:为每个用户添加欢迎消息
const user = $input.item.json;
return {
...user,
greeting: `欢迎 ${user.name}!`,
processedAt: new Date().toISOString()
};console.log() 输出调试信息在数据处理工作流中,常常需要知道前一个节点返回了多少个数据项。这对于后续的条件判断或日志记录很重要。
// 设置 Code Node 为"逐项处理"模式
// 使用 $items() 函数获取所有项
const allItems = $items();
const totalItems = allItems.length;
const currentIndex = allItems.findIndex(item => item.json === $input.item.json);
return {
message: `正在处理第 ${currentIndex + 1} 项,总共 ${totalItems} 项`,
totalCount: totalItems,
currentIndex: currentIndex
};// 设置 Code Node 为"一次全部处理"模式
// 直接使用 $input.all() 获取所有项
const allItems = $input.all();
const itemCount = allItems.length;
return {
totalProcessed: itemCount,
message: `成功处理了 ${itemCount} 个数据项`,
timestamp: new Date().toISOString()
};// 在"一次全部处理"模式下
const orders = $input.all();
const totalOrders = orders.length;
const totalAmount = orders.reduce((sum, order) => sum + (order.json.amount || 0), 0);
const successCount = orders.filter(order => order.json.status === 'completed').length;
return {
report: {
totalOrders: totalOrders,
completedOrders: successCount,
totalRevenue: totalAmount.toFixed(2),
averageOrderValue: (totalAmount / totalOrders).toFixed(2),
processTime: new Date().toISOString()
}
};二进制数据包括文件、图片、PDF 等。在 n8n 中处理二进制数据是常见需求。
// 使用 getBinaryDataBuffer() 函数
// 注意:此方法仅在 JavaScript 中可用,Python 不支持
const binaryBuffer = await this.helpers.getBinaryDataBuffer('data');
// 现在你可以对 binaryBuffer 进行操作
// 例如:计算哈希值、修改内容等// 在"一次全部处理"模式下
const items = $input.all();
let results = [];
for (const item of items) {
// 检查是否存在二进制数据
if (item.binary && item.binary.data) {
const fileName = item.binary.data.fileName;
const fileSize = item.binary.data.fileSize;
const mimeType = item.binary.data.mimeType;
results.push({
fileName: fileName,
fileSize: `${(fileSize / 1024).toFixed(2)} KB`,
mimeType: mimeType,
status: 'processed'
});
}
}
return results;// 获取二进制数据并转换为文本
const binaryData = await this.helpers.getBinaryDataBuffer('data');
const csvContent = binaryData.toString('utf-8');
// 修改内容(例如添加列标题)
const modifiedContent = 'ID,Name,Email\n' + csvContent;
// 转换回 Base64 编码以保存
const base64Encoded = Buffer.from(modifiedContent).toString('base64');
return {
binary: {
data: {
data: base64Encoded,
fileName: 'modified-data.csv',
mimeType: 'text/csv'
}
}
};// JavaScript 调试
console.log('当前用户:', $input.item.json);
console.log('所有项数:', $input.all().length);
console.log('执行时间:', new Date().toISOString());
// 返回数据到下一个节点
return $input.item.json;有时候 console.log() 会输出 [object Object],这表示打印的是一个对象。解决方法:
// ❌ 会输出 [object Object]
console.log('数据:', complexObject);
// ✅ 正确做法:转换为 JSON 字符串
console.log('数据:', JSON.stringify(complexObject, null, 2));
// ✅ 或者使用 print() 函数(Python)
print(str(complexObject))如果使用 Python,处理 JsProxy 对象时需要特殊转换:
import json
from pyodide.ffi import to_py
# 获取前一个节点的数据
prev_data = to_py($input.all())
print(f"处理的项数: {len(prev_data)}")
print(json.dumps(prev_data, indent=2))
return prev_data这个案例展示了一个完整的工作流,包括:
{
"name": "数据处理与通知工作流",
"description": "完整的n8n工作流示例:接收数据,进行处理,然后发送通知",
"nodes": [
{
"parameters": {
"path": "/webhook/data-processor",
"responseMode": "onReceived",
"method": "POST"
},
"id": "webhook_trigger",
"name": "Webhook 触发器",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [50, 150]
},
{
"parameters": {
"mode": "runOnceForAllItems",
"jsCode": "// 提取上传的数据文件\nconst items = $input.all();\n\nlet processedData = [];\nfor (let item of items) {\n const fileName = item.binary?.data?.fileName || item.json?.name || 'unknown';\n const fileSize = item.binary?.data?.fileSize || 0;\n const mimeType = item.binary?.data?.mimeType || 'unknown';\n \n processedData.push({\n fileName: fileName,\n fileSize: (fileSize / 1024).toFixed(2) + ' KB',\n mimeType: mimeType,\n processTime: new Date().toISOString(),\n status: 'processed'\n });\n}\n\nreturn processedData;"
},
"id": "code_node_process",
"name": "数据处理 Code Node",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [250, 150]
},
{
"parameters": {
"mode": "runOnceForAllItems",
"jsCode": "// 获取前一个节点返回的项目数并生成统计报告\nconst allItems = $input.all();\nconst itemCount = allItems.length;\n\n// 计算总大小\nlet totalSize = 0;\nallItems.forEach(item => {\n const sizeStr = item.json.fileSize.replace(' KB', '');\n totalSize += parseFloat(sizeStr);\n});\n\nreturn {\n message: `成功处理了 ${itemCount} 个数据项`,\n totalItems: itemCount,\n totalSize: totalSize.toFixed(2) + ' KB',\n processingDetails: allItems,\n timestamp: new Date().toISOString()\n};"
},
"id": "code_node_summary",
"name": "统计汇总 Code Node",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [450, 150]
},
{
"parameters": {
"jsCode": "// 生成最终通知消息\nconst summary = $input.item.json;\nconst notificationText = `\n✅ 数据处理完成!\n\n📊 处理统计:\n• 处理项数:${summary.totalItems}\n• 总大小:${summary.totalSize}\n• 完成时间:${new Date(summary.timestamp).toLocaleString('zh-CN')}\n\n📝 详细信息:\n${summary.processingDetails.map((item, idx) => \n `${idx + 1}. ${item.fileName} (${item.fileSize}, ${item.mimeType})`\n).join('\\n')}\n`;\n\nreturn {\n notification: notificationText,\n status: 'success'\n};"
},
"id": "final_message",
"name": "最终通知消息",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [650, 150]
}
],
"connections": {
"webhook_trigger": {
"main": [\n [\n {\n "node": "code_node_process",\n "type": "main",\n "index": 0\n }\n ]\n ]\n },
"code_node_process": {
"main": [\n [\n {\n "node": "code_node_summary",\n "type": "main",\n "index": 0\n }\n ]\n ]\n },\n "code_node_summary": {
"main": [\n [\n {\n "node": "final_message",\n "type": "main",\n "index": 0\n }\n ]\n ]\n }\n },\n "active": false,
"settings": {}\n}| 节点 | 作用 | Code 语言 |
|---|---|---|
| Webhook 触发器 | 接收 HTTP POST 请求,启动工作流 | 无 |
| 数据处理 Code Node | 逐一处理每个上传的文件,提取元数据 | JavaScript |
| 统计汇总 Code Node | 聚合所有处理结果,生成统计报告 | JavaScript |
| 最终通知消息 | 生成格式化的通知文本,可发送到邮件/Slack | JavaScript |
A: 使用 $node 对象访问其他节点的数据:
// 访问名为"Node1"的节点的 JSON 数据
const nodeData = $node["Node1"].json;
// 访问二进制数据
const binaryData = $node["Node1"].binary.data;A: 常见错误排查:
?.A: 使用以下最佳实践:
// ✅ 好的做法:使用 Map 而不是嵌套循环查找
const userMap = new Map(users.map(u => [u.id, u]));
const result = orders.map(order => ({
...order,
user: userMap.get(order.userId)
}));
// ❌ 避免:嵌套循环会导致 O(n²) 复杂度
const result = orders.map(order => ({
...order,
user: users.find(u => u.id === order.userId)
}));A: n8n 提供了 $request 和 this.helpers.httpRequest() 供调用 API。对于 npm 包,需要使用 n8n 内置的库或通过自定义节点添加。
// 使用 n8n 的内置 HTTP 工具
const response = await this.helpers.httpRequest({
method: 'GET',
url: 'https://api.example.com/data'
});
return response;| 问题 | 解决方案 |
|---|---|
| 处理大数组慢 | 使用 Map 而不是 .find();使用数组方法如 reduce() 代替循环 |
| 内存占用高 | 使用流处理;分批处理数据 |
| 代码执行超时 | 将复杂逻辑拆分为多个 Code Node;使用"一次全部处理"模式 |
| 频繁调用 API | 使用缓存;批量请求 |
Code Node 是 n8n 中最强大的工具之一,掌握它的几个关键要点:
$input.item.json 和 $input.all() 的区别console.log() 和浏览器控制台进行调试getBinaryDataBuffer() 处理文件[1] 官方文档: https://docs.n8n.io/code/cookbook/code-node/
[2] n8n系列教程: https://www.undsky.com/blog/?category=n8n%E6%95%99%E7%A8%8B#