【n8n教程】:SSE Trigger节点,实时接收服务器推送事件

本教程将带你从零开始掌握 SSE Trigger 节点,一个强大的实时数据接收工具。如果你想让 n8n 工作流能够实时接收来自服务器的消息推送,而不是被动等待请求,那么这个教程就是为你准备的。

🎯 学习目标


📖 第一部分:核心概念速览

什么是 SSE(Server-Sent Events)?

SSE 是一种服务器推送技术,它通过 HTTP 连接让服务器能够主动向客户端发送数据。想象这样一个场景:

SSE Trigger 节点的作用就是让 n8n 工作流成为一个**"监听器"**,时刻准备接收来自指定 URL 的服务器推送事件。

SSE 对比其他触发方式

触发方式工作原理适用场景实时性
Webhook外部系统推送数据接收第三方应用事件⭐⭐⭐⭐⭐
SSE Trigger连接服务器持续接收实时数据流、市场行情⭐⭐⭐⭐⭐
Schedule定时执行定期任务
HTTP Request主动查询按需获取数据⭐⭐

🔧 第二部分:SSE Trigger 节点配置详解

节点参数说明

SSE Trigger 节点配置非常简洁,只有一个核心参数:

URL(必填)

配置步骤

  1. 1. 添加 SSE Trigger 节点
    • • 在 n8n 工作流编辑器中点击 "+" 添加节点
    • • 在搜索框输入 "SSE Trigger"
    • • 点击选择该节点
  2. 2. 输入 SSE 服务器 URL
    • • 在节点的 URL 参数中输入你的服务器地址
    • • 确保 URL 是有效的且服务器支持 SSE 协议
  3. 3. 保存并激活
    • • 点击 "Save" 保存配置
    • • 节点会开始监听指定 URL 的事件

工作原理流程


    
    
    
  ┌─────────────────┐
│   n8n 工作流     │
│  (SSE Trigger)  │
└────────┬────────┘
         │
         │ 建立长连接(HTTP streaming)
         ↓
┌─────────────────┐
│   SSE 服务器    │
│  (你的数据源)   │
└────────┬────────┘
         │
         │ 推送事件数据
         ↓
    处理数据流
    │ → 格式化
    │ → 条件判断
    │ → 发送通知
    └ → 存储记录

💡 第三部分:实战案例 - 构建实时股票行情监控工作流

案例场景

我们将创建一个实时股票价格监控系统,它会:

可执行工作流代码

以下是完整的可执行工作流 JSON 代码,你可以直接导入到 n8n:


    
    
    
  {
  "name"
: "实时股票监控 - SSE Trigger 工作流",
  "nodes"
: [
    {

      "parameters"
: {
        "url"
: "https://n8n.io"
      }
,
      "id"
: "SSE Trigger",
      "name"
: "SSE Trigger",
      "type"
: "n8n-nodes-base.sseTrigger",
      "typeVersion"
: 1,
      "position"
: [250, 300]
    }
,
    {

      "parameters"
: {
        "functionCode"
: "// 接收来自 SSE 的原始数据并解析\nconst data = $input.first().json;\nconsole.log('收到 SSE 事件:', data);\n\n// 模拟股票数据处理\nreturn {\n  json: {\n    timestamp: new Date().toISOString(),\n    symbol: data.symbol || 'STOCK-001',\n    price: data.price || 100,\n    previousPrice: data.previousPrice || 99,\n    change: (data.price - data.previousPrice).toFixed(2),\n    changePercent: (((data.price - data.previousPrice) / data.previousPrice) * 100).toFixed(2)\n  }\n};"
      }
,
      "id"
: "Function - 解析数据",
      "name"
: "Function - 解析数据",
      "type"
: "n8n-nodes-base.function",
      "typeVersion"
: 1,
      "position"
: [450, 300],
      "credentials"
: []
    }
,
    {

      "parameters"
: {
        "conditions"
: {
          "string"
: [
            {

              "value1"
: "={{ $json.change }}",
              "operation"
: "notEmpty"
            }

          ]

        }

      }
,
      "id"
: "If - 价格变动检测",
      "name"
: "If - 价格变动检测",
      "type"
: "n8n-nodes-base.if",
      "typeVersion"
: 1,
      "position"
: [650, 300]
    }
,
    {

      "parameters"
: {
        "functionCode"
: "// 生成提醒消息\nconst data = $input.first().json;\nconst changePercent = parseFloat(data.changePercent);\nconst alertLevel = Math.abs(changePercent) > 2 ? '高' : '中';\n\nreturn {\n  json: {\n    message: `🔔 ${data.symbol} 价格更新: $${data.price} (变化: ${data.change > 0 ? '+' : ''}${data.change}%, 警报等级: ${alertLevel})`,\n    alertLevel: alertLevel,\n    shouldNotify: Math.abs(changePercent) > 1,\n    data: data\n  }\n};"
      }
,
      "id"
: "Function - 生成警报",
      "name"
: "Function - 生成警报",
      "type"
: "n8n-nodes-base.function",
      "typeVersion"
: 1,
      "position"
: [850, 250]
    }
,
    {

      "parameters"
: {
        "method"
: "POST",
        "url"
: "={{ $env.WEBHOOK_URL || 'https://webhook.site/unique-id' }}",
        "sendBody"
: true,
        "bodyParameters"
: {
          "parameters"
: [
            {

              "name"
: "timestamp",
              "value"
: "={{ $json.data.timestamp }}"
            }
,
            {

              "name"
: "alert_message",
              "value"
: "={{ $json.message }}"
            }
,
            {

              "name"
: "stock_symbol",
              "value"
: "={{ $json.data.symbol }}"
            }
,
            {

              "name"
: "price",
              "value"
: "={{ $json.data.price }}"
            }
,
            {

              "name"
: "alert_level",
              "value"
: "={{ $json.alertLevel }}"
            }

          ]

        }

      }
,
      "id"
: "HTTP Request - 发送警报",
      "name"
: "HTTP Request - 发送警报",
      "type"
: "n8n-nodes-base.httpRequest",
      "typeVersion"
: 4,
      "position"
: [1050, 250]
    }
,
    {

      "parameters"
: {
        "table"
: "stock_alerts",
        "columns"
: "timestamp,symbol,price,change,alert_level",
        "mode"
: "insert"
      }
,
      "id"
: "SQLite - 记录日志",
      "name"
: "SQLite - 记录日志",
      "type"
: "n8n-nodes-base.sqlite",
      "typeVersion"
: 1,
      "position"
: [1050, 350]
    }

  ]
,
  "connections"
: {
    "SSE Trigger"
: {
      "main"
: [
        [

          {

            "node"
: "Function - 解析数据",
            "type"
: "main",
            "index"
: 0
          }

        ]

      ]

    }
,
    "Function - 解析数据"
: {
      "main"
: [
        [

          {

            "node"
: "If - 价格变动检测",
            "type"
: "main",
            "index"
: 0
          }

        ]

      ]

    }
,
    "If - 价格变动检测"
: {
      "main"
: [
        [

          {

            "node"
: "Function - 生成警报",
            "type"
: "main",
            "index"
: 0
          }

        ]
,
        [
]
      ]

    }
,
    "Function - 生成警报"
: {
      "main"
: [
        [

          {

            "node"
: "HTTP Request - 发送警报",
            "type"
: "main",
            "index"
: 0
          }
,
          {

            "node"
: "SQLite - 记录日志",
            "type"
: "main",
            "index"
: 0
          }

        ]

      ]

    }

  }

}

工作流导入步骤

  1. 1. 复制上面的完整 JSON 代码
  2. 2. 打开 n8n 工作流编辑器
  3. 3. 点击右上角菜单 → "Import from File/URL"
  4. 4. 选择"Import from URL"或粘贴 JSON
  5. 5. 点击"Import"完成

工作流执行流程


    
    
    
  1️⃣ SSE Trigger 连接
   ↓
   监听 https://n8n.io 的服务器推送事件
   
2️⃣ 解析数据
   ↓
   提取股票代码、价格、变动情况
   
3️⃣ 条件判断
   ↓
   检查是否有价格变动
   
4️⃣ 生成警报
   ↓
   计算变化幅度,确定警报等级
   
5️⃣ 双路输出
   ├─ HTTP 请求:发送警报通知
   └─ 数据库:记录历史日志

🚀 第四部分:进阶技巧

1. 处理认证(如需要)

如果你的 SSE 服务器需要身份验证,可以在 URL 中包含认证信息:


    
    
    
  https://username:password@api.example.com/stream

或使用环境变量:


    
    
    
  https://{{ $env.SSE_API_KEY }}@api.example.com/stream

2. 错误处理

使用 Error Trigger 节点捕获连接失败:


    
    
    
  {
  "id"
: "Error Trigger",
  "name"
: "Error Trigger",
  "type"
: "n8n-nodes-base.errorTrigger",
  "parameters"
: {}
}

3. 添加连接超时控制

在连接的 Function 节点中添加超时检查:


    
    
    
  // 检查连接状态
if
 ($input.first().json.error) {
  console
.error('SSE 连接失败:', $input.first().json.error);
  return
 { json: { status: 'failed' }};
}
return
 { json: { status: 'connected' }};

4. 数据过滤优化

使用 Filter 节点只处理符合条件的数据:


    
    
    
  // 只处理价格变动超过 1% 的事件
const
 changePercent = parseFloat($json.changePercent);
return
 Math.abs(changePercent) > 1;

⚠️ 常见问题解答

Q1:工作流激活后没有接收到事件?

解决方案:

Q2:如何测试 SSE Trigger 是否工作?

解决方案:

Q3:一个工作流中可以有多个 SSE Trigger 吗?

回答: 可以的!每个 SSE Trigger 连接到不同的 URL,就能同时监听多个数据源。

Q4:SSE 连接会自动重连吗?

回答: n8n 的 SSE Trigger 会在连接断开时自动尝试重新连接,但可能需要几秒钟。


📊 性能建议


🎓 总结

要点说明
SSE 是什么服务器推送事件,实现实时数据流
何时使用需要实时接收服务器数据更新
核心参数URL(服务器地址)
最佳实践结合 Function 节点进行数据处理和条件判断
监控日志记录所有接收的事件便于调试

引用链接

[1] 官方文档: https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.ssetrigger/
[2] n8n系列教程: https://www.undsky.com/blog/?category=n8n%E6%95%99%E7%A8%8B#