本教程将带你从零开始掌握 SSE Trigger 节点,一个强大的实时数据接收工具。如果你想让 n8n 工作流能够实时接收来自服务器的消息推送,而不是被动等待请求,那么这个教程就是为你准备的。
SSE 是一种服务器推送技术,它通过 HTTP 连接让服务器能够主动向客户端发送数据。想象这样一个场景:
SSE Trigger 节点的作用就是让 n8n 工作流成为一个**"监听器"**,时刻准备接收来自指定 URL 的服务器推送事件。
| 触发方式 | 工作原理 | 适用场景 | 实时性 |
|---|---|---|---|
| Webhook | 外部系统推送数据 | 接收第三方应用事件 | ⭐⭐⭐⭐⭐ |
| SSE Trigger | 连接服务器持续接收 | 实时数据流、市场行情 | ⭐⭐⭐⭐⭐ |
| Schedule | 定时执行 | 定期任务 | ⭐ |
| HTTP Request | 主动查询 | 按需获取数据 | ⭐⭐ |
SSE Trigger 节点配置非常简洁,只有一个核心参数:
https://api.example.com/stream/eventshttps://stock-api.demo.com/sse/tickerhttp://localhost:8080/stream┌─────────────────┐
│ n8n 工作流 │
│ (SSE Trigger) │
└────────┬────────┘
│
│ 建立长连接(HTTP streaming)
↓
┌─────────────────┐
│ SSE 服务器 │
│ (你的数据源) │
└────────┬────────┘
│
│ 推送事件数据
↓
处理数据流
│ → 格式化
│ → 条件判断
│ → 发送通知
└ → 存储记录我们将创建一个实时股票价格监控系统,它会:
以下是完整的可执行工作流 JSON 代码,你可以直接导入到 n8n:
{
"name": "实时股票监控 - SSE Trigger 工作流",
"nodes": [
{
"parameters": {
"url": "https://n8n.io"
},
"id": "SSE Trigger",
"name": "SSE Trigger",
"type": "n8n-nodes-base.sseTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"functionCode": "// 接收来自 SSE 的原始数据并解析\nconst data = $input.first().json;\nconsole.log('收到 SSE 事件:', data);\n\n// 模拟股票数据处理\nreturn {\n json: {\n timestamp: new Date().toISOString(),\n symbol: data.symbol || 'STOCK-001',\n price: data.price || 100,\n previousPrice: data.previousPrice || 99,\n change: (data.price - data.previousPrice).toFixed(2),\n changePercent: (((data.price - data.previousPrice) / data.previousPrice) * 100).toFixed(2)\n }\n};"
},
"id": "Function - 解析数据",
"name": "Function - 解析数据",
"type": "n8n-nodes-base.function",
"typeVersion": 1,
"position": [450, 300],
"credentials": []
},
{
"parameters": {
"conditions": {
"string": [
{
"value1": "={{ $json.change }}",
"operation": "notEmpty"
}
]
}
},
"id": "If - 价格变动检测",
"name": "If - 价格变动检测",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [650, 300]
},
{
"parameters": {
"functionCode": "// 生成提醒消息\nconst data = $input.first().json;\nconst changePercent = parseFloat(data.changePercent);\nconst alertLevel = Math.abs(changePercent) > 2 ? '高' : '中';\n\nreturn {\n json: {\n message: `🔔 ${data.symbol} 价格更新: $${data.price} (变化: ${data.change > 0 ? '+' : ''}${data.change}%, 警报等级: ${alertLevel})`,\n alertLevel: alertLevel,\n shouldNotify: Math.abs(changePercent) > 1,\n data: data\n }\n};"
},
"id": "Function - 生成警报",
"name": "Function - 生成警报",
"type": "n8n-nodes-base.function",
"typeVersion": 1,
"position": [850, 250]
},
{
"parameters": {
"method": "POST",
"url": "={{ $env.WEBHOOK_URL || 'https://webhook.site/unique-id' }}",
"sendBody": true,
"bodyParameters": {
"parameters": [
{
"name": "timestamp",
"value": "={{ $json.data.timestamp }}"
},
{
"name": "alert_message",
"value": "={{ $json.message }}"
},
{
"name": "stock_symbol",
"value": "={{ $json.data.symbol }}"
},
{
"name": "price",
"value": "={{ $json.data.price }}"
},
{
"name": "alert_level",
"value": "={{ $json.alertLevel }}"
}
]
}
},
"id": "HTTP Request - 发送警报",
"name": "HTTP Request - 发送警报",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [1050, 250]
},
{
"parameters": {
"table": "stock_alerts",
"columns": "timestamp,symbol,price,change,alert_level",
"mode": "insert"
},
"id": "SQLite - 记录日志",
"name": "SQLite - 记录日志",
"type": "n8n-nodes-base.sqlite",
"typeVersion": 1,
"position": [1050, 350]
}
],
"connections": {
"SSE Trigger": {
"main": [
[
{
"node": "Function - 解析数据",
"type": "main",
"index": 0
}
]
]
},
"Function - 解析数据": {
"main": [
[
{
"node": "If - 价格变动检测",
"type": "main",
"index": 0
}
]
]
},
"If - 价格变动检测": {
"main": [
[
{
"node": "Function - 生成警报",
"type": "main",
"index": 0
}
],
[]
]
},
"Function - 生成警报": {
"main": [
[
{
"node": "HTTP Request - 发送警报",
"type": "main",
"index": 0
},
{
"node": "SQLite - 记录日志",
"type": "main",
"index": 0
}
]
]
}
}
}1️⃣ SSE Trigger 连接
↓
监听 https://n8n.io 的服务器推送事件
2️⃣ 解析数据
↓
提取股票代码、价格、变动情况
3️⃣ 条件判断
↓
检查是否有价格变动
4️⃣ 生成警报
↓
计算变化幅度,确定警报等级
5️⃣ 双路输出
├─ HTTP 请求:发送警报通知
└─ 数据库:记录历史日志如果你的 SSE 服务器需要身份验证,可以在 URL 中包含认证信息:
https://username:password@api.example.com/stream或使用环境变量:
https://{{ $env.SSE_API_KEY }}@api.example.com/stream使用 Error Trigger 节点捕获连接失败:
{
"id": "Error Trigger",
"name": "Error Trigger",
"type": "n8n-nodes-base.errorTrigger",
"parameters": {}
}在连接的 Function 节点中添加超时检查:
// 检查连接状态
if ($input.first().json.error) {
console.error('SSE 连接失败:', $input.first().json.error);
return { json: { status: 'failed' }};
}
return { json: { status: 'connected' }};使用 Filter 节点只处理符合条件的数据:
// 只处理价格变动超过 1% 的事件
const changePercent = parseFloat($json.changePercent);
return Math.abs(changePercent) > 1;解决方案:
解决方案:
https://stocksera.pythonanywhere.com/api/stock/real-time/回答: 可以的!每个 SSE Trigger 连接到不同的 URL,就能同时监听多个数据源。
回答: n8n 的 SSE Trigger 会在连接断开时自动尝试重新连接,但可能需要几秒钟。
| 要点 | 说明 |
|---|---|
| SSE 是什么 | 服务器推送事件,实现实时数据流 |
| 何时使用 | 需要实时接收服务器数据更新 |
| 核心参数 | URL(服务器地址) |
| 最佳实践 | 结合 Function 节点进行数据处理和条件判断 |
| 监控日志 | 记录所有接收的事件便于调试 |
[1] 官方文档: https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.ssetrigger/
[2] n8n系列教程: https://www.undsky.com/blog/?category=n8n%E6%95%99%E7%A8%8B#