go-redis
go-redis是Go编程语言的官方Redis客户端库,通过提供简单易用的接口来实现与redis服务器的交互。而redis在很多开发场景中都能起到重要作用,这里借助这个库来了解一下redis的相关用法。
go-redis库下载方式:
go get github.com/redis/go-redis/v9
——————————
另外需要注意这个sdk只是可以通过代码来操作redis数据库,故环境中需要存在对应的数据库,macos的下载方式如下:
brew install redis #下载
brew services start redis #开机自动运行,会作为后台常驻服务
brew services stop redis #停止服务
brew services restart redis #重启服务
brew services list #查看服务状态
按需使用命令即可。
简单了解
Redis常见数据类型:
- String(字符串):是redis最基本的类型,一个key对应一个value,可通过命令设置键值对的过期时间
- Hash(哈希):哈希是一个键值对集合。注意并不是会讲数据进行hash加密存储,具体看后续代码。
- List(列表):redis列表是最简单的字符串列表,一般通过这个实现消息队列。
- Set(集合):string类型的无序集合
- zset(sorted set 有序集合):和set一样是string类型的集合且不允许重复,不同的是这里的每个元素都会关联一个double类型的分数,通过这个分数来为成员进行从小打大的排序。
等数据类型,这里就简单看看如上几个就行,后续使用中也许会用到。
后续通过代码来学习go如何操作redis以及redis的一些使用场景。如下string类型的代码:
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
var ctx = context.Background() //让同一批次的操作都在同一个上下文中,方便管理
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "redis",
DB: 0, //使用默认 DB
})
defer rdb.Close()
//string类型键值对基础代码实例:
err := rdb.Set(ctx, "key", "value", 0).Err() //0代表永久不过期
if err != nil {
panic(err)
}
val, err := rdb.Get(ctx, "key").Result()
if err != nil {
panic(err)
}
fmt.Println("放入的key:", val)
//删除操作
err = rdb.Del(ctx, "key").Err()
if err != nil {
panic(err)
}
val, err = rdb.Get(ctx, "key").Result()
if err == redis.Nil {
fmt.Println("成功删除key")
} else {
panic(err)
}
fmt.Println("删除后的key:", val)
//设置过期时间:
err = rdb.Set(ctx, "key1", "value1", 5*time.Second).Err() //设置5秒的延时
if err != nil {
panic(err)
}
val, err = rdb.Get(ctx, "key1").Result()
if err != nil {
panic(err)
}
fmt.Println("放入的key1:", val)
//延时5秒
time.Sleep(5 * time.Second)
val, err = rdb.Get(ctx, "key1").Result()
if err == redis.Nil {
fmt.Println("key1已被删除")
} else {
panic(err)
}
fmt.Println("key1的值:", val)
}
效果如下:
放入的key: value
成功删除key
删除后的key:
放入的key1: value1
key1已被删除
key1的值:
代码简单实现了连接redis并且往里面放入以及取出键值对。
hash存储的代码:
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background() //让同一批次的操作都在同一个上下文中,方便管理
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "redis",
DB: 0, //使用默认 DB
})
defer rdb.Close()
//操作redis中的hash类型:
err := rdb.HSet(ctx, "mykey", "myval", "hello").Err()
if err != nil {
panic(err)
}
err = rdb.HSet(ctx, "mykey", "myval1", "hello1").Err()
if err != nil {
panic(err)
}
val, err := rdb.HGet(ctx, "mykey", "myval").Result()
if err != nil {
panic(err)
}
val1, err := rdb.HGet(ctx, "mykey", "myval1").Result()
if err != nil {
panic(err)
}
fmt.Println("mykey分类下的值:", val, val1)
}
运行效果:
mykey分类下的值: hello hello1
简单来说就是实现了一种分类的操作,可以看成是在mykey这个分类下存放了几个键值对的集合。如下形象的分类:
Key: task99
Fields:
- target: "example.com"
- plugin: "xss_detector"
- status: "running"
- start_time: "2026-04-21"
以集合的方式方便存储不同需求的数据,有利于对redis中存储的数据的结构的理解以及修改。
功能场景
redis在实际使用中的具体功能特点,下述主要是我在dast中可能用到的功能,看看如下场景中利用到了redis的什么功能特点。
消息队列
预期实现场景:管理已下发的任务的进行,采用基于队列的先进先出策略,基于redis的List数据类型实现扫描任务按顺序下发。
这里就直接使用nuclei扫描模块的demo来作为任务下发,然后再通过redis来实现任务管理。就是生产者消费者问题,通过信号量机制来实现对任务并发的控制,可以通过两种方式来实现任务队列:
- 基于Redis的List数据类型
- 基于Redis Stream实现
下面分别讲一下要如何实现。
List类型
列表类型的数据天生适合用于实现消息队列,并且redis还提供了一些命令用于保证需求的实现,这里我们主要是通过go-redis库来操作redis,而go-redis库提供了 LPUSH 从左边推入数据,使用 BRPOP 从右边弹出数据,满足实现先进先出的策略。
实现代码如下:
- services/redis_queue.go基本配置代码:
package services
import (
"context"
"encoding/json"
"fmt"
"time"
goredis "github.com/redis/go-redis/v9"
)
// ScanTask 扫描任务结构
type ScanTask struct {
ID string `json:"id"` // 任务唯一ID
Targets []string `json:"targets"` // 扫描目标列表
TemplateIDs []string `json:"template_ids"` // 模板ID列表
CreatedAt int64 `json:"created_at"` // 创建时间戳
}
// RedisQueue Redis 队列管理器(FIFO 先进先出)
type RedisQueue struct {
client *goredis.Client
ctx context.Context
}
// NewRedisQueue 创建 Redis 队列管理器
func NewRedisQueue(addr, password string, db int) (*RedisQueue, error) {
client := goredis.NewClient(&goredis.Options{
Addr: addr,
Password: password,
DB: db,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
ctx := context.Background()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("redis connection failed: %w", err)
}
return &RedisQueue{
client: client,
ctx: ctx,
}, nil
}
// Close 关闭 Redis 连接
func (rq *RedisQueue) Close() error {
return rq.client.Close()
}
// PushTask 将任务推送到队列(FIFO:先进先出)
// 使用 LPUSH 从左边推入,配合 BRPOP 从右边弹出,实现先进先出
func (rq *RedisQueue) PushTask(queueName string, task *ScanTask) error {
taskJSON, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("marshal task failed: %w", err)
}
// LPUSH: 从列表左侧插入
if err := rq.client.LPush(rq.ctx, queueName, taskJSON).Err(); err != nil {
return fmt.Errorf("push task to queue failed: %w", err)
}
fmt.Printf("[Producer] Task %s pushed to queue: %s\n", task.ID, queueName)
return nil
}
// PopTask 从队列中弹出任务(阻塞式,FIFO)
// 使用 BRPOP 从右边弹出,如果队列为空则阻塞等待
func (rq *RedisQueue) PopTask(queueName string, timeout time.Duration) (*ScanTask, error) {
// BRPOP: 从列表右侧弹出,阻塞等待直到有数据或超时
result, err := rq.client.BRPop(rq.ctx, timeout, queueName).Result()
if err != nil {
if err == goredis.Nil {
return nil, nil // 超时,没有任务
}
return nil, fmt.Errorf("pop task from queue failed: %w", err)
}
// result[0] 是队列名,result[1] 是数据
var task ScanTask
if err := json.Unmarshal([]byte(result[1]), &task); err != nil {
return nil, fmt.Errorf("unmarshal task failed: %w", err)
}
fmt.Printf("[Consumer] Task %s popped from queue: %s\n", task.ID, queueName)
return &task, nil
}
// GetQueueLength 获取队列长度
func (rq *RedisQueue) GetQueueLength(queueName string) (int64, error) {
length, err := rq.client.LLen(rq.ctx, queueName).Result()
if err != nil {
return 0, fmt.Errorf("get queue length failed: %w", err)
}
return length, nil
}
// ClearQueue 清空队列
func (rq *RedisQueue) ClearQueue(queueName string) error {
if err := rq.client.Del(rq.ctx, queueName).Err(); err != nil {
return fmt.Errorf("clear queue failed: %w", err)
}
fmt.Printf("[Manager] Queue cleared: %s\n", queueName)
return nil
}
- producer.go文件(生产者-任务下发):
package main
import (
"fmt"
"os"
"time"
"Go_ENV/services"
"github.com/google/uuid"
)
// 生产者示例:将扫描任务推送到 Redis 队列(FIFO 先进先出)
func main() {
// 1. 连接 Redis
queue, err := services.NewRedisQueue("localhost:6379", "redis", 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
os.Exit(1)
}
defer queue.Close()
fmt.Println("=== Redis FIFO Queue Producer ===\n")
// 2. 创建扫描任务
tasks := []*services.ScanTask{
{
ID: uuid.New().String(),
Targets: []string{"127.0.0.1:8888"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().Unix(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com", "127.0.0.1:8888"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().Unix(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().Unix(),
},
}
// 3. 推送任务到队列(先进先出)
queueName := "nuclei:scan:queue"
fmt.Printf("Pushing %d tasks to queue: %s\n\n", len(tasks), queueName)
for i, task := range tasks {
if err := queue.PushTask(queueName, task); err != nil {
fmt.Fprintf(os.Stderr, "Failed to push task: %v\n", err)
continue
}
fmt.Printf("✓ Task %d: %s (targets: %v)\n", i+1, task.ID, task.Targets)
}
// 4. 查看队列长度
length, err := queue.GetQueueLength(queueName)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to get queue length: %v\n", err)
} else {
fmt.Printf("\n📊 Current queue length: %d\n", length)
}
fmt.Println("\n✅ All tasks pushed successfully!")
fmt.Println("💡 Run consumer.go to process these tasks")
}
- consumer.go(消费者-取数据并下发相关任务):
package main
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"Go_ENV/services"
nuclei "github.com/projectdiscovery/nuclei/v3/lib"
"github.com/projectdiscovery/nuclei/v3/pkg/catalog/disk"
"github.com/projectdiscovery/nuclei/v3/pkg/output"
)
// 批处理模式消费者:处理完所有任务后自动退出
func main() {
// 1. 连接 Redis
queue, err := services.NewRedisQueue("localhost:6379", "redis", 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
os.Exit(1)
}
defer queue.Close()
fmt.Println("=== Redis Queue Consumer (Batch Mode) ===")
fmt.Println("Max concurrent tasks: 3")
fmt.Println("Will exit when all tasks are completed\n")
// 2. 设置优雅退出
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("\n\n🛑 Shutting down consumer...")
cancel()
}()
// 3. 创建信号量控制并发数(最多 2 个)
semaphore := make(chan struct{}, 2)
// 4. 使用 WaitGroup 等待所有任务完成
var wg sync.WaitGroup
// 5. 队列名称
queueName := "nuclei:scan:queue"
// 6. 先检查队列长度
initialLength, err := queue.GetQueueLength(queueName)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to get queue length: %v\n", err)
os.Exit(1)
}
if initialLength == 0 {
fmt.Println("📭 Queue is empty, nothing to process")
return
}
fmt.Printf("📊 Found %d tasks in queue\n\n", initialLength)
// 7. 处理任务的循环
emptyCount := 0
maxEmptyCount := 3 // 连续 3 次为空就认为队列已清空
for {
select {
case <-ctx.Done():
fmt.Println("✅ Consumer stopped by signal")
wg.Wait() // 等待所有正在运行的任务完成
return
default:
// 从队列中获取任务(短超时,避免长时间阻塞)
task, err := queue.PopTask(queueName, 1*time.Second)
if err != nil {
fmt.Fprintf(os.Stderr, "❌ Failed to pop task: %v\n", err)
continue
}
if task == nil {
// 没有任务
emptyCount++
fmt.Printf("📭 No tasks available (%d/%d)\n", emptyCount, maxEmptyCount)
if emptyCount >= maxEmptyCount {
fmt.Println("\n🎉 All tasks completed! Waiting for running tasks to finish...")
wg.Wait() // 等待所有正在运行的任务完成
fmt.Println("✅ All tasks finished, exiting")
return
}
continue
}
// 重置空队列计数
emptyCount = 0
// 获取信号量(如果已有 2 个任务在运行,这里会阻塞)
fmt.Printf("\n[%s] 🔄 Trying to acquire semaphore... (current: %d/2)\n",
time.Now().Format("15:04:05.000"), len(semaphore))
semaphore <- struct{}{}
fmt.Printf("[%s] ✅ Semaphore acquired! Current concurrency: %d/2\n",
time.Now().Format("15:04:05.000"), len(semaphore))
// 增加 WaitGroup 计数
wg.Add(1)
// 启动 goroutine 处理任务
go func(t *services.ScanTask) {
startTime := time.Now()
taskIDShort := t.ID[:8] // 只显示前 8 位
defer func() {
duration := time.Since(startTime)
<-semaphore // 释放信号量
fmt.Printf("[%s] 🔓 Semaphore released! Current concurrency: %d/2 (Task %s took %v)\n",
time.Now().Format("15:04:05.000"), len(semaphore), taskIDShort, duration)
wg.Done() // 减少 WaitGroup 计数
}()
fmt.Printf("\n[%s] 🔍 [%s] Starting scan\n", time.Now().Format("15:04:05.000"), taskIDShort)
fmt.Printf(" Targets: %v\n", t.Targets)
fmt.Printf(" Templates: %v\n", t.TemplateIDs)
fmt.Printf(" Created: %s\n", time.Unix(t.CreatedAt, 0).Format("2006-01-02 15:04:05"))
// 执行 Nuclei 扫描
if err := runNucleiScan(ctx, t); err != nil {
fmt.Fprintf(os.Stderr, "[%s] ❌ [%s] Scan failed: %v\n",
time.Now().Format("15:04:05.000"), taskIDShort, err)
} else {
fmt.Printf("[%s] ✅ [%s] Scan completed\n",
time.Now().Format("15:04:05.000"), taskIDShort)
}
}(task)
}
}
}
// runNucleiScan 执行 Nuclei 扫描任务
func runNucleiScan(ctx context.Context, task *services.ScanTask) error {
if len(task.Targets) == 0 {
return fmt.Errorf("targets is empty")
}
// 获取模板目录
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("get home dir failed: %w", err)
}
templateDir := filepath.Join(home, "nuclei-templates")
// 配置 Nuclei 选项
opts := []nuclei.NucleiSDKOptions{
nuclei.WithCatalog(disk.NewCatalog(templateDir)),
nuclei.DisableUpdateCheck(),
}
// 如果指定了模板 ID,添加过滤器
if len(task.TemplateIDs) > 0 {
opts = append(opts, nuclei.WithTemplateFilters(nuclei.TemplateFilters{
IDs: task.TemplateIDs,
}))
}
// 创建 Nuclei 引擎
engine, err := nuclei.NewNucleiEngineCtx(ctx, opts...)
if err != nil {
return fmt.Errorf("create nuclei engine failed: %w", err)
}
defer engine.Close()
// 加载模板
if err := engine.LoadAllTemplates(); err != nil {
return fmt.Errorf("load templates failed: %w", err)
}
// 加载目标
engine.LoadTargets(task.Targets, true)
// 执行扫描并处理结果
fmt.Printf(" [%s] ⏳ [%s] Scanning...\n", time.Now().Format("15:04:05.000"), task.ID[:8])
return engine.ExecuteCallbackWithCtx(ctx, func(ev *output.ResultEvent) {
if ev == nil || !ev.MatcherStatus {
return
}
severity := ev.Info.SeverityHolder.Severity.String()
if severity == "" {
severity = "unknown"
}
fmt.Printf(" [%s] [VULN] [%s] severity=%s template=%s name=%s matched=%s\n",
time.Now().Format("15:04:05.000"),
task.ID[:8],
severity,
ev.TemplateID,
ev.Info.Name,
ev.Matched,
)
})
}
这里的代码设置了需要下发三个任务,然后并发设置成两个,用以模拟对任务下发的控制,最后效果如下:
运行producer.go:

随后可以看一下redis中存储的数据:

成功放入三个待下发任务的task的相关信息。
然后运行consumer.go:
=== Redis Queue Consumer (Batch Mode) ===
Max concurrent tasks: 3
Will exit when all tasks are completed
📊 Found 3 tasks in queue
[Consumer] Task f98fd49a-8c16-4c6e-b1fe-9e5a4246c76a popped from queue: nuclei:scan:queue
[18:23:53.491] 🔍 [f98fd49a] Starting scan
[Consumer] Task d81dba44-34dc-4654-9bb8-518192f51178 popped from queue: nuclei:scan:queue
Targets: [127.0.0.1:8888]
Templates: [local-flask multi-request-check]
[18:23:53.491] 🔍 [d81dba44] Starting scan
Created: 2026-04-24 18:23:30
Targets: [www.baidu.com 127.0.0.1:8888]
Templates: [local-flask multi-request-check]
Created: 2026-04-24 18:23:30
[Consumer] Task 403345d3-09bb-4c44-aa11-7efe729d601c popped from queue: nuclei:scan:queue
[18:23:55.676] ⏳ [f98fd49a] Scanning...
[18:23:55.679] [VULN] [f98fd49a] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
[18:23:55.683] ✅ [f98fd49a] Scan completed
[18:23:55.683] 🔓 Semaphore released! Current concurrency: 2/2 (Task f98fd49a took 2.191739208s)
[18:23:55.683] 🔍 [403345d3] Starting scan
Targets: [www.baidu.com]
Templates: [local-flask multi-request-check]
Created: 2026-04-24 18:23:30
[18:23:55.895] ⏳ [d81dba44] Scanning...
[18:23:55.899] [VULN] [d81dba44] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
[18:23:56.021] ⏳ [403345d3] Scanning...
[18:23:56.408] [VULN] [d81dba44] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[18:23:56.413] ✅ [d81dba44] Scan completed
[18:23:56.414] 🔓 Semaphore released! Current concurrency: 1/2 (Task d81dba44 took 2.922455292s)
[18:23:56.482] [VULN] [403345d3] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[18:23:56.495] ✅ [403345d3] Scan completed
[18:23:56.495] 🔓 Semaphore released! Current concurrency: 0/2 (Task 403345d3 took 812.515125ms)
📭 No tasks available (1/3)
📭 No tasks available (2/3)
📭 No tasks available (3/3)
🎉 All tasks completed! Waiting for running tasks to finish...
✅ All tasks finished, exiting
观察结果就可以知道成功实现了任务数量的控制,第三个任务必须在第二个任务结束释放信号量后才能下发任务,这里是因为将取任务放在前面的,所以一进入循环就会取任务,然后才是判断信号量是否有空的,当然可以尝试将代码改成优先判断信号量是否有空的,然乎才去取数据,这里不多说。
这里涉及到的知识点:
- 生产者-消费者问题,主要是通过信号量机制来实现的,大学课程有讲过,这里不多说。
- 主要通过redis的list数据类型实现消息队列,通过go-redis库操作redis,将下发任务所需要的数据存入redis数据库中,然后使用 LPUSH 从左边推入数据,使用 BRPOP 从右边弹出数据,实现先进先出策略,和任务下发的顺序有关。
- WaitGroup是一个用于线程同步的库,通过使用
sync.WaitGroup来管理并发任务,可以实现等待goroutine完成后再继续执行主程序,在这里通过代码设置,要求必须在所有任务都结束后才会结束全部主程序,需要关注到如下几个关键函数:- Add():每次激活一个goroutine之前,需要先调用Add()来添加要等待完成的goroutines数量。
- Done():每个goroutine完成时,都需要调用该方法来表示goroutine完成了,该方法会对等待计数器减1(注意Add()和Done的调用次数必须匹配)
- Wait():在等待计数器减为0之前,Wait()会一直阻塞当前的goroutine。
- 具体实现可以看代码,整体机制还是比较有意思的。
另外的需要考虑的点:
- 这里的控制并发数量是通过消费者部分的代码来实现的,但是在分布式系统中,消费者部分的信号量不能共享,后续考虑转为redis计数器来实现类信号量机制来控制整个分布式系统上的任务运行数量。
Redis Stream
Redis Stream是一个仅追加(Append-only)的消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容,用redis的key来区分不同的stream,其具有如下功能:
- 持久化:消息写入后会存入磁盘,redis宕机等意外错误都不会丢消息
- 消息ID的独特性:消息ID的形式是timestampInMillis-sequence(时间戳-序号),比如1777131319422-2,表示当前的消息在1777131319422时产生,并且是该毫秒内产生的第 2 条消息,同时为了保证消息是有序的,因此 Redis 生成的 ID 是单调递增有序的。
- 消费组:消费组是一组消费者,可以协同消费stream中的事件,消费组会维护每个消费者的消费状态,以确保每个事件只被一个消费者处理。
- 确认机制(ACK):当消费者处理完任务后告诉redis已完成,redis才会把该消息从待处理列表中移除。
整体结构大概如下:

其中的关键节点如下:
- Consumer group:消费组,一个消费组有多个消费者。
- Last_delivered_id:游标,每个消费组都有一个游标,任意一个消费者读取了消息都会使这个游标游动,用于表示当前消费组消费到哪条消息了。
- Pending_ids(也被称为PEL):消费者的状态变量,用于维护消费者的未确认的id,pending_ids记录了当前已经被客户端读取的消息,但是还没有发送ack(Acknowledge character:确认字符)
并且在取消息时,Redis Stream也是严格按照ID递增来取的,故非常符合先进先出策略,由此我们也可以通过redis stream来实现消息队列。
代码实现如下:
- services/redis_stream.go文件(基本的配置代码):
package services
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// ScanTask 扫描任务
type ScanTask struct {
ID string
Targets []string
TemplateIDs []string
CreatedAt int64
}
// RedisStream Redis Stream 管理器
type RedisStream struct {
client *redis.Client
ctx context.Context
}
// NewRedisStream 创建 Stream 管理器
func NewRedisStream(addr, password string, db int) (*RedisStream, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})
ctx := context.Background()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("redis connection failed: %w", err)
}
return &RedisStream{client: client, ctx: ctx}, nil
}
func (rs *RedisStream) Close() error {
return rs.client.Close()
}
// CreateConsumerGroup 创建消费者组
func (rs *RedisStream) CreateConsumerGroup(streamName, groupName string) error {
// 尝试创建消费者组,如果已存在会返回错误(忽略)
err := rs.client.XGroupCreateMkStream(rs.ctx, streamName, groupName, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return err
}
return nil
}
// AddTask 添加任务到 Stream(FIFO:按消息 ID 顺序)
func (rs *RedisStream) AddTask(streamName string, task *ScanTask) (string, error) {
// 直接将整个任务序列化为 JSON
taskJSON, err := json.Marshal(task)
if err != nil {
return "", fmt.Errorf("marshal task failed: %w", err)
}
// XADD: 添加消息到 Stream,Redis 自动生成递增的消息 ID
// 消息 ID 格式:时间戳(毫秒)-序列号,保证严格递增(FIFO)
messageID, err := rs.client.XAdd(rs.ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{
"task": string(taskJSON), // 整个任务作为一个 JSON 字段
},
}).Result()
if err != nil {
return "", err
}
return messageID, nil
}
// ReadTasks 从 Stream 读取任务(消费者组模式,FIFO 顺序)
func (rs *RedisStream) ReadTasks(streamName, groupName, consumerName string, count int64, block time.Duration) ([]redis.XMessage, error) {
// XREADGROUP: 从消费者组读取消息
// ">" 表示只读取未被该消费者组消费的新消息
// Redis 按消息 ID 字典序返回,保证 FIFO(先进先出)
streams, err := rs.client.XReadGroup(rs.ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"}, // ">" = 读取新消息(未被消费的)
Count: count, // 批量读取数量
Block: block, // 阻塞等待时间,用来redis是直接返回空还是让消费者等待一定时间看是否有新的消息进来,0代表无限阻塞直到新消息进来,
}).Result()
if err != nil {
if err == redis.Nil {
return nil, nil // 没有新消息
}
return nil, err
}
if len(streams) == 0 || len(streams[0].Messages) == 0 {
return nil, nil
}
// 返回的消息已按消息 ID 排序(FIFO)
return streams[0].Messages, nil
}
// AckTask 确认任务已处理
func (rs *RedisStream) AckTask(streamName, groupName string, messageIDs ...string) error {
// XACK: 确认消息已处理
return rs.client.XAck(rs.ctx, streamName, groupName, messageIDs...).Err()
}
// GetPendingTasks 获取未确认的任务(用于故障恢复)
func (rs *RedisStream) GetPendingTasks(streamName, groupName string) ([]redis.XPendingExt, error) {
// XPENDING: 查询未确认的消息
pending, err := rs.client.XPendingExt(rs.ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
}).Result()
if err != nil {
return nil, err
}
return pending, nil
}
// ClaimPendingTask 认领未确认的任务(从其他消费者)
func (rs *RedisStream) ClaimPendingTask(streamName, groupName, consumerName string, minIdleTime time.Duration, messageIDs ...string) ([]redis.XMessage, error) {
// XCLAIM: 认领其他消费者的未确认消息
messages, err := rs.client.XClaim(rs.ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: consumerName,
MinIdle: minIdleTime,
Messages: messageIDs,
}).Result()
if err != nil {
return nil, err
}
return messages, nil
}
// GetStreamLength 获取 Stream 长度
func (rs *RedisStream) GetStreamLength(streamName string) (int64, error) {
// XLEN: 获取 Stream 中的消息数量
return rs.client.XLen(rs.ctx, streamName).Result()
}
// GetStreamInfo 获取 Stream 信息
func (rs *RedisStream) GetStreamInfo(streamName string) (*redis.XInfoStream, error) {
// XINFO STREAM: 获取 Stream 详细信息
return rs.client.XInfoStream(rs.ctx, streamName).Result()
}
// GetGroupInfo 获取消费者组信息
func (rs *RedisStream) GetGroupInfo(streamName string) ([]redis.XInfoGroup, error) {
// XINFO GROUPS: 获取消费者组信息
return rs.client.XInfoGroups(rs.ctx, streamName).Result()
}
// ParseTask 解析消息为任务
func ParseTask(msg redis.XMessage) (*ScanTask, error) {
// 直接从 JSON 反序列化整个任务
taskJSON, ok := msg.Values["task"].(string)
if !ok {
return nil, fmt.Errorf("task field not found or invalid")
}
var task ScanTask
if err := json.Unmarshal([]byte(taskJSON), &task); err != nil {
return nil, fmt.Errorf("unmarshal task failed: %w", err)
}
return &task, nil
}
- Producer.go文件:
package main
import (
"fmt"
"os"
"time"
"Go_ENV/services"
"github.com/google/uuid"
)
func main() {
stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
os.Exit(1)
}
defer stream.Close()
streamName := "scan:tasks"
groupName := "scanners"
// 创建消费者组
if err := stream.CreateConsumerGroup(streamName, groupName); err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer group: %v\n", err)
os.Exit(1)
}
fmt.Println("=== Redis Stream Producer (FIFO Demo) ===\n")
// 创建测试任务
tasks := []*services.ScanTask{
{
ID: uuid.New().String(),
Targets: []string{"127.0.0.1:8888"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com", "127.0.0.1:8888"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"127.0.0.1:8888"},
TemplateIDs: []string{"local-flask"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com"},
TemplateIDs: []string{"multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
}
// 添加任务到 Stream(按顺序添加,验证 FIFO)
fmt.Printf("📤 Adding %d tasks to stream: %s\n", len(tasks), streamName)
fmt.Println(" (Tasks will be consumed in FIFO order by Message ID)\n")
for i, task := range tasks {
// 添加小延迟,确保消息 ID 不同
if i > 0 {
time.Sleep(10 * time.Millisecond)
}
messageID, err := stream.AddTask(streamName, task)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to add task: %v\n", err)
continue
}
fmt.Printf("✓ Task %d: ID=%s, Targets=%v\n → Message ID: %s\n",
i+1, task.ID[:8], task.Targets, messageID)
}
// 显示 Stream 信息
length, _ := stream.GetStreamLength(streamName)
fmt.Printf("\n📊 Stream length: %d messages\n", length)
info, _ := stream.GetStreamInfo(streamName)
fmt.Printf("📊 First entry ID: %s (oldest, will be consumed first)\n", info.FirstEntry.ID)
fmt.Printf("📊 Last entry ID: %s (newest, will be consumed last)\n", info.LastEntry.ID)
groups, _ := stream.GetGroupInfo(streamName)
for _, group := range groups {
fmt.Printf("📊 Group: %s, Pending: %d, Consumers: %d\n",
group.Name, group.Pending, group.Consumers)
}
fmt.Println("\n✅ All tasks added in FIFO order!")
fmt.Println("💡 Run consumer to see tasks consumed in the same order")
}
- Consumer.go(消费者文件):
package main
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"Go_ENV/services"
nuclei "github.com/projectdiscovery/nuclei/v3/lib"
"github.com/projectdiscovery/nuclei/v3/pkg/catalog/disk"
"github.com/projectdiscovery/nuclei/v3/pkg/output"
"github.com/redis/go-redis/v9"
)
func main() {
stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
os.Exit(1)
}
defer stream.Close()
streamName := "scan:tasks"
groupName := "scanners"
consumerName := fmt.Sprintf("consumer-%d", time.Now().Unix()) //消费者名称
fmt.Printf("=== Redis Stream Consumer (FIFO Mode) ===\n")
fmt.Printf("Consumer: %s\n", consumerName)
fmt.Printf("Batch size: 3 (fetch 3 tasks at once)\n")
fmt.Printf("Max concurrent: 3\n")
fmt.Printf("💡 Tasks will be consumed in FIFO order (by Message ID)\n\n")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 优雅退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("\n🛑 Shutting down consumer...")
cancel()
}()
// 并发控制:最多 3 个任务同时运行
semaphore := make(chan struct{}, 3)
var wg sync.WaitGroup
// 先处理 Pending 任务(故障恢复)
recoverPendingTasks(stream, streamName, groupName, consumerName, &wg, semaphore)
// 主循环:消费新任务
fmt.Println("🔄 Waiting for new tasks...")
for {
select {
case <-ctx.Done():
wg.Wait()
fmt.Println("✅ Consumer stopped")
return
default:
// 一次拉取 3 条消息(与信号量容量一致)
messages, err := stream.ReadTasks(streamName, groupName, consumerName, 3, 1*time.Second)
if err != nil {
fmt.Fprintf(os.Stderr, "Read error: %v\n", err)
continue
}
if messages == nil {
continue
}
fmt.Printf("\n📦 Fetched %d tasks from stream (FIFO order by Message ID)\n", len(messages))
// 显示消息 ID 顺序,验证 FIFO
for i, msg := range messages {
task, _ := services.ParseTask(msg)
fmt.Printf(" [%d] Task ID: %s - Message ID: %s\n",
i+1, task.ID[:8], msg.ID)
}
fmt.Println()
// 处理每个任务(并发控制)
for i, msg := range messages {
// 获取信号量(如果已有 3 个任务在运行,这里会阻塞)
semaphore <- struct{}{}
wg.Add(1)
go func(m redis.XMessage, index int) {
startTime := time.Now()
defer func() {
duration := time.Since(startTime)
<-semaphore
fmt.Printf("[%s] 🔓 Semaphore released (took %v)\n",
time.Now().Format("15:04:05.000"), duration)
wg.Done()
}()
task, err := services.ParseTask(m)
if err != nil {
fmt.Printf("❌ Parse error: %v\n", err)
return
}
fmt.Printf("\n[%s] 🔍 Task [%s] Starting scan\n",
time.Now().Format("15:04:05.000"), task.ID[:8])
fmt.Printf(" Message ID: %s\n", m.ID)
fmt.Printf(" Targets: %v\n", task.Targets)
fmt.Printf(" Templates: %v\n", task.TemplateIDs)
// 执行 Nuclei 扫描
if err := runNucleiScan(ctx, task); err != nil {
fmt.Printf("[%s] ❌ Task [%s] Scan failed: %v (will retry)\n",
time.Now().Format("15:04:05.000"), task.ID[:8], err)
// 不发送 ACK,任务会被重新分配
return
}
// 任务成功,发送 ACK
if err := stream.AckTask(streamName, groupName, m.ID); err != nil {
fmt.Printf("❌ ACK error: %v\n", err)
return
}
fmt.Printf("[%s] ✅ Task [%s] Completed and ACKed\n",
time.Now().Format("15:04:05.000"), task.ID[:8])
}(msg, i)
}
}
}
}
// recoverPendingTasks 恢复未确认的任务(故障恢复)
func recoverPendingTasks(stream *services.RedisStream, streamName, groupName, consumerName string, wg *sync.WaitGroup, semaphore chan struct{}) {
pending, err := stream.GetPendingTasks(streamName, groupName)
if err != nil {
fmt.Printf("Failed to get pending tasks: %v\n", err)
return
}
if len(pending) == 0 {
return
}
fmt.Printf("🔄 Found %d pending tasks, recovering...\n", len(pending))
for _, p := range pending {
// 认领超过 10 秒未确认的任务
if p.Idle < 10*time.Second {
continue
}
messages, err := stream.ClaimPendingTask(streamName, groupName, consumerName, 10*time.Second, p.ID)
if err != nil {
fmt.Printf("Failed to claim task %s: %v\n", p.ID, err)
continue
}
for _, msg := range messages {
semaphore <- struct{}{}
wg.Add(1)
go func(m redis.XMessage) {
defer func() {
<-semaphore
wg.Done()
}()
task, _ := services.ParseTask(m)
fmt.Printf("🔄 [%s] Recovering task (Message ID: %s)\n", task.ID[:8], m.ID)
// 重新执行 Nuclei 扫描
if err := runNucleiScan(context.Background(), task); err != nil {
fmt.Printf("❌ [%s] Recovered task failed: %v\n", task.ID[:8], err)
return
}
stream.AckTask(streamName, groupName, m.ID)
fmt.Printf("✅ [%s] Recovered task completed\n", task.ID[:8])
}(msg)
}
}
}
// runNucleiScan 执行 Nuclei 扫描
func runNucleiScan(ctx context.Context, task *services.ScanTask) error {
if len(task.Targets) == 0 {
return fmt.Errorf("targets is empty")
}
// 获取模板目录
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("get home dir failed: %w", err)
}
templateDir := filepath.Join(home, "nuclei-templates")
// 配置 Nuclei 选项
opts := []nuclei.NucleiSDKOptions{
nuclei.WithCatalog(disk.NewCatalog(templateDir)),
nuclei.DisableUpdateCheck(),
}
// 如果指定了模板 ID,添加过滤器
if len(task.TemplateIDs) > 0 {
opts = append(opts, nuclei.WithTemplateFilters(nuclei.TemplateFilters{
IDs: task.TemplateIDs,
}))
}
// 创建 Nuclei 引擎
engine, err := nuclei.NewNucleiEngineCtx(ctx, opts...)
if err != nil {
return fmt.Errorf("create nuclei engine failed: %w", err)
}
defer engine.Close()
// 加载模板
if err := engine.LoadAllTemplates(); err != nil {
return fmt.Errorf("load templates failed: %w", err)
}
// 加载目标
engine.LoadTargets(task.Targets, true)
// 执行扫描并处理结果
fmt.Printf(" ⏳ Task [%s] Scanning...\n", task.ID[:8])
return engine.ExecuteCallbackWithCtx(ctx, func(ev *output.ResultEvent) {
if ev == nil || !ev.MatcherStatus {
return
}
severity := ev.Info.SeverityHolder.Severity.String()
if severity == "" {
severity = "unknown"
}
fmt.Printf(" [VULN] Task [%s] severity=%s template=%s name=%s matched=%s\n",
task.ID[:8],
severity,
ev.TemplateID,
ev.Info.Name,
ev.Matched,
)
})
}
- Monitor.go文件(监测文件,用于查看消费组内的信息):
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"Go_ENV/services"
)
func main() {
stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
os.Exit(1)
}
defer stream.Close()
streamName := "scan:tasks"
fmt.Println("=== Redis Stream Monitor (FIFO Verification) ===\n")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
// 初始显示
showStreamInfo(stream, streamName)
for {
select {
case <-sigChan:
fmt.Println("\n🛑 Shutting down monitor...")
return
case <-ticker.C:
showStreamInfo(stream, streamName)
}
}
}
func showStreamInfo(stream *services.RedisStream, streamName string) {
separator := "============================================================"
fmt.Println("\n" + separator)
fmt.Printf("[%s] 📊 Stream Status\n", time.Now().Format("15:04:05"))
fmt.Println(separator)
// Stream 信息
length, _ := stream.GetStreamLength(streamName)
fmt.Printf(" Stream length: %d messages\n", length)
info, err := stream.GetStreamInfo(streamName)
if err == nil {
fmt.Printf(" First entry ID: %s (oldest, FIFO head)\n", info.FirstEntry.ID)
fmt.Printf(" Last entry ID: %s (newest, FIFO tail)\n", info.LastEntry.ID)
}
// 消费者组信息
groups, err := stream.GetGroupInfo(streamName)
if err == nil {
fmt.Println("\n Consumer Groups:")
for _, group := range groups {
fmt.Printf(" - %s: Pending=%d, Consumers=%d, Lag=%d\n",
group.Name, group.Pending, group.Consumers, group.Lag)
}
}
// Pending 任务
pending, err := stream.GetPendingTasks(streamName, "scanners")
if err == nil && len(pending) > 0 {
fmt.Printf("\n ⚠️ Pending tasks: %d\n", len(pending))
for i, p := range pending {
if i >= 5 {
fmt.Printf(" ... and %d more\n", len(pending)-5)
break
}
fmt.Printf(" - %s: Consumer=%s, Idle=%v, Deliveries=%d\n",
p.ID, p.Consumer, p.Idle, p.RetryCount)
}
}
fmt.Println(separator)
}
运行效果如下:
Producer.go文件:
=== Redis Stream Producer (FIFO Demo) ===
📤 Adding 5 tasks to stream: scan:tasks
(Tasks will be consumed in FIFO order by Message ID)
✓ Task 1: ID=639f0385, Targets=[127.0.0.1:8888]
→ Message ID: 1777177221155-0
✓ Task 2: ID=b49c1e37, Targets=[www.baidu.com 127.0.0.1:8888]
→ Message ID: 1777177221166-0
✓ Task 3: ID=39f4df6b, Targets=[www.baidu.com]
→ Message ID: 1777177221177-0
✓ Task 4: ID=bac46c3e, Targets=[127.0.0.1:8888]
→ Message ID: 1777177221188-0
✓ Task 5: ID=c7904c34, Targets=[www.baidu.com]
→ Message ID: 1777177221199-0
📊 Stream length: 5 messages
📊 First entry ID: 1777177221155-0 (oldest, will be consumed first)
📊 Last entry ID: 1777177221199-0 (newest, will be consumed last)
📊 Group: scanners, Pending: 0, Consumers: 0
✅ All tasks added in FIFO order!
💡 Run consumer to see tasks consumed in the same order
然后运行监测文件,出是消息如下:
=== Redis Stream Monitor (FIFO Verification) ===
============================================================
[12:21:24] 📊 Stream Status
============================================================
Stream length: 5 messages
First entry ID: 1777177221155-0 (oldest, FIFO head)
Last entry ID: 1777177221199-0 (newest, FIFO tail)
Consumer Groups:
- scanners: Pending=0, Consumers=0, Lag=5
============================================================
后续每隔3s就会再次获取消费组相关信息。随后运行消费者文件即可:
=== Redis Stream Consumer (FIFO Mode) ===
Consumer: consumer-1777177803
Batch size: 3 (fetch 3 tasks at once)
Max concurrent: 3
💡 Tasks will be consumed in FIFO order (by Message ID)
🔄 Waiting for new tasks...
📦 Fetched 3 tasks from stream (FIFO order by Message ID)
[1] Task ID: 639f0385 - Message ID: 1777177221155-0
[2] Task ID: b49c1e37 - Message ID: 1777177221166-0
[3] Task ID: 39f4df6b - Message ID: 1777177221177-0
[12:30:03.343] 🔍 Task [639f0385] Starting scan
[12:30:03.343] 🔍 Task [b49c1e37] Starting scan
Message ID: 1777177221166-0
Targets: [www.baidu.com 127.0.0.1:8888]
Templates: [local-flask multi-request-check]
[12:30:03.343] 🔍 Task [39f4df6b] Starting scan
Message ID: 1777177221155-0
Targets: [127.0.0.1:8888]
Templates: [local-flask multi-request-check]
Message ID: 1777177221177-0
Targets: [www.baidu.com]
Templates: [local-flask multi-request-check]
📦 Fetched 2 tasks from stream (FIFO order by Message ID)
[1] Task ID: bac46c3e - Message ID: 1777177221188-0
[2] Task ID: c7904c34 - Message ID: 1777177221199-0
⏳ Task [639f0385] Scanning...
[12:30:04.659] ✅ Task [639f0385] Completed and ACKed
[12:30:04.659] 🔓 Semaphore released (took 1.316242208s)
[12:30:04.659] 🔍 Task [bac46c3e] Starting scan
Message ID: 1777177221188-0
Targets: [127.0.0.1:8888]
Templates: [local-flask]
⏳ Task [bac46c3e] Scanning...
[12:30:04.779] ✅ Task [bac46c3e] Completed and ACKed
[12:30:04.779] 🔓 Semaphore released (took 120.020959ms)
[12:30:04.779] 🔍 Task [c7904c34] Starting scan
Message ID: 1777177221199-0
Targets: [www.baidu.com]
Templates: [multi-request-check]
⏳ Task [39f4df6b] Scanning...
⏳ Task [b49c1e37] Scanning...
⏳ Task [c7904c34] Scanning...
[VULN] Task [39f4df6b] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[12:30:05.360] ✅ Task [39f4df6b] Completed and ACKed
[12:30:05.360] 🔓 Semaphore released (took 2.017454292s)
[VULN] Task [b49c1e37] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[12:30:05.454] ✅ Task [b49c1e37] Completed and ACKed
[12:30:05.454] 🔓 Semaphore released (took 2.111749083s)
[VULN] Task [c7904c34] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[12:30:05.596] ✅ Task [c7904c34] Completed and ACKed
[12:30:05.596] 🔓 Semaphore released (took 817.565458ms)
🛑 Shutting down consumer...
✅ Consumer stopped
很快就完成了,monitor.go的终端输出变成了:
============================================================
[12:30:20] 📊 Stream Status
============================================================
Stream length: 5 messages
First entry ID: 1777177221155-0 (oldest, FIFO head)
Last entry ID: 1777177221199-0 (newest, FIFO tail)
Consumer Groups:
- scanners: Pending=0, Consumers=1, Lag=0
============================================================
成功消费完了全部的消息。
简单解析以及一些知识点:
- 上述代码主要任务队列的流程是:producer创建流和消费组并下发任务 =》消费者先监测pel中存在消息,存在就转到自己名下并下发任务 =〉pel中不存在在流中拉取新的消息来下发任务 =》 最后任务扫描结束后就向redis返回ack,pel中就会删除这条数据。由此实现整体内容的覆盖
- redis保证了原子性,处理请求很快,毫秒级别,不会出现什么死锁的情况,比如两个消费者同时请求两个消息,stream会先按顺序递增取两个给一个消费者,然后再继续递增的两个给另一个消费者。故也是非常适合用于实现分布式分布任务的。
- PEL是分两个层级的,如下:
- 消费者级别的PEL:包含该消费者中的所有未确认的消息。
- 消费组级别的PEL:其包含了该消费组中所有的未确认的消息,是所有消费者PEL的并集
- 从官方文档中可以看出,xack命令是操作消费组级别的pel,主要和messageID绑定,当一个ack被返回时,就会清除这个消息的条目。以确保该消息不会被再次处理。
有个问题就是,从代码实例以及简单思考中就可以知道,因为是需要返回ack来表示消费者是否运行完任务,若没返回ack的话就会一直存在于pel中,此时就可能被其他消费者给转移到自己的pel中并下发任务,这样会导致重复下发任务,资源浪费,如何解决呢?至少有两种方式:
- 直接通过XClaim的MinIdle选项来执行,这个选项的含义是只有当这批pending消息在原消费者名下“空闲”了足够长时间,XClaim才允许将其转换给当前消费者。可以参考
ClaimPendingTask函数部分的代码逻辑,自己写个demo测试一下就行,这里不多说了。 - 可以通过分布式锁来保证在运行的数据不会被另外一个消费者获取,并且可以确保唯一性以及针对性。针对这一条消息上锁,防止其被其他的消费者拿走,在实际使用中,可以给这条消息上个1小时的锁,时间到了还没跑完就拿给其他的消费者跑,预防消费者机器突然宕机导致这条数据没有成功完成扫描导致的死锁。具体实现就看后面的分布式锁部分内容。
- 两者对比:从这里来看,其实两种方式都是加一个时间限制来防止在运行的消息被拿给其他消费者跑。但还是有一定问题,比如将时间设置成30分钟,存在如下几个场景:
- 若在时间范围内跑完,那就完全没问题了,锁会被删除,相关消息会直接被消除
- 若大于时间范围,比如说运行需要40分钟,那么设置的时间在30分钟时就会失效,就有可能会被其他消费者拿走导致多次下发任务。针对这种情况,可以尝试使用分布式锁来解决,通过给锁设置成小范围时间,比如6分钟,然后对应消费者在运行时每隔五分钟重置锁的过期时间为6分钟,可以有效防止被其他的消费者拿走,并且不会存在时间浪费,只要成功运行完成,就会直接返回ack并删除锁。
- 故相比较下来个人觉得使用分布式锁来作为是否可以被拿走的依据,具体实现就看后面的分布式锁部分的代码。
后续可以优化的点:
- 个人认为整体在redis stream中更好的实现方式就是消费者进行循环请求,以信号量作为阻塞的点,只要有信号量了,就先去cover掉pel中的,然后才是拉取新的任务,最后实现扫描任务的全覆盖。
两者对比
都可以实现持久化以及先进先出,但是redis stream有两个非常好的点:
- 确认机制(ACK)和PEl :就是通过ack来标识任务是否完成,适用于redis重启和服务宕机的突然解决情况。虽然list类型的消息队列同样的可以尝试新增键值对来实现对任务进度的把控,但是redis stream还有个非常好的点就是消息的长期存储,在pel中还存在对应的消息的内容,而list类型的消息队列是取了就取了,可能还需要其他的代码实现来解决这个数据丢失问题。
- 消费组概念,在分布式系统中我觉得是一个非常好的使用方式,直接把不同的节点放在一个消费组中,便于整体的管理以及修改。
同时通过先cover pel => 再取新的消息这个流程,可以很好的实现并发+先下发的任务先运行,可以极大提高使用以及运行效率。
实时分析
这个板块主要关注如何实现扫描结果的同步。涉及到redis的两个使用场景:Redis Pub/Sub和redis stream,下面来分别看看各自的使用场景,最后再看要选择哪个。
Redis发布订阅
这里涉及到Redis的发布/订阅(Pub/Sub)模式,这是一个消息通信模式,用于实现消息的发布和订阅,具体如下:
- 大概实现方式就是一个消息发布者将消息发布到一个频道(channel),而一个或多个消息订阅者可以订阅该频道以接收消息。
- 发布者和订阅者之间是解耦合的,发布者不需要知道订阅者的身份,只需将消息发布到指定的频道即可。
- 订阅者可以订阅多个频道,并在每个频道上接收消息,
这个机制可以用于实现数据同步等功能,这里主要就这个点来看看代码是怎么实现的。
Redis Pub/Sub实现的基本代码如下:
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
var ctx = context.Background()
func main() {
// 1. 初始化客户端
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "redis",
DB: 0,
})
channelName := "dast.task.events"
// ---------------- Subscriber (订阅者) ----------------
// 启动一个 goroutine 模拟异步监听
go func() {
pubsub := rdb.Subscribe(ctx, channelName)
defer pubsub.Close()
fmt.Println("[*] 订阅者已就绪,正在监听频道:", channelName)
// 持续接收消息
for {
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
panic(err)
}
fmt.Printf("[Subscriber] 收到新任务详情: %s\n", msg.Payload)
}
}()
// 等待一秒确保订阅者已启动
time.Sleep(time.Second)
// ---------------- Publisher (发布者) ----------------
fmt.Println("[+] 发布者准备发送消息...")
payload := "START_SCAN: https://example.com"
err := rdb.Publish(ctx, channelName, payload).Err()
if err != nil {
panic(err)
}
// 保持主程序运行
select {}
}
实现效果:
[*] 订阅者已就绪,正在监听频道: dast.task.events
[+] 发布者准备发送消息...
[Subscriber] 收到新任务详情: START_SCAN: https://example.com
代码实现细节如下:
- 通过go-redis库的Subscribe函数来订阅一个频道并持续接收消息,然后使用Publish()函数指定channel来公布消息实现数据同步。
- 这里通过
select {}实现保持主程序运行,select是go语言中用于channel操作的语句,语法类似 switch,但是每个case都必须是channel的相关操作,一般来说,如果没有定义defautl语句,select会阻塞等待直到一个case的channel有操作,而这里的代码没有定义case和default,故会一直阻塞,达到了让主程序一直运行的效果。
通过保持主程序运行,让其中的goroutine语句持续生效,由此达到持续接收消息的效果。我们可以直接通过redis语句来往对应频道下发消息,看这里能否接收到:
PUBLISH "dast.task.events" "START_SCAN: hahahahaha,just a test"

接收者同步接收到消息:

————————————————————
Redis Stream
基本概念前面说过,大概思路就是新建一个stream,然后在任务下发、扫描结束的时候往stream中追加任务状态的信息,然后消费者(用于监测)就去抓取并更新对应的任务状态,由此实现数据同步。参考如下架构:
┌─────────────┐
│ 扫描任务 │ (你的业务代码)
└──────┬──────┘
│ 1. 发送进度事件
▼
┌─────────────────────────────┐
│ Redis Stream │ (消息队列)
│ progress:stream │
└──────┬──────────────────────┘
│
│ 2. 多消费者订阅
│
┌───┴────┬─────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│消费者1 │ │监控面板 │ │同步服务│
│(分析) │ │(展示) │ │(持久化)│
└────────┘ └────────┘ └───┬────┘
│ 3. 写入 Redis Hash
▼
┌──────────────┐
│ Redis Hash │ (快速查询)
│task:progress:*│
└──────────────┘
架构如上,具体实现就不多说了。
两者对比
- Redis Pub/Sub:
优点:是解耦合的,发布者和订阅者之间互不影响,适合实时聊天应用程序、实时新闻推送等场景
缺点:Redis发布订阅机制有个问题是消息无法持久化,如果出现网络断开、redis宕机,消息就会被丢弃。
- Redis Stream:
重点就是持久化,可以防止消息被丢弃,同时可以实现数据同步。
分布式锁
预期实现场景:对下发的任务中设置的相关信息进行处理,已实现分布式的调度。
Redis分布式锁是一种基于redis的高性能、高可用的分布式互斥机制,用于解决分布式系统中多个服务实例对共享资源的并发访问问题。其核心原理是利用redis键的唯一性和原子性操作来实现锁的获取与释放。通过对消息进行加锁,可以保证某个资源在同一时间只能被一个执行者处理。
redis分布式锁其实就是一个特殊用法的键值对,其存在如下结构:
- 锁的key:比如
lock:task:123。 - 锁的value:一般将这个value当作操作锁的身份标识。
- ttl:锁的过期时间
一般需要依靠SET命令的两个关键参数:
- NX(Not Exist):仅当键不存在时才能设置成功,确保锁的互斥性。
- PX(Expire):设置键的过期时间,防止死锁。
具体格式形如:
SET key value NX EX ttl
另外还有一个非常重要的点就是redis分布式锁在实际运用中的实现方式,具体可以看后续代码的分析。
————————————
结合到nuclei+redis stream+分布式锁的代码如下:
- services/redis-stream.go:
package services
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// ScanTask 扫描任务
type ScanTask struct {
ID string
Targets []string
TemplateIDs []string
CreatedAt int64
}
// RedisStream Redis Stream 管理器
type RedisStream struct {
client *redis.Client
ctx context.Context
}
// NewRedisStream 创建 Stream 管理器
func NewRedisStream(addr, password string, db int) (*RedisStream, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})
ctx := context.Background()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("redis connection failed: %w", err)
}
return &RedisStream{client: client, ctx: ctx}, nil
}
func (rs *RedisStream) Close() error {
return rs.client.Close()
}
// CreateConsumerGroup 创建消费者组
func (rs *RedisStream) CreateConsumerGroup(streamName, groupName string) error {
// 尝试创建消费者组,如果已存在会返回错误(忽略)
err := rs.client.XGroupCreateMkStream(rs.ctx, streamName, groupName, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return err
}
return nil
}
// AddTask 添加任务到 Stream(FIFO:按消息 ID 顺序)
func (rs *RedisStream) AddTask(streamName string, task *ScanTask) (string, error) {
// 直接将整个任务序列化为 JSON
taskJSON, err := json.Marshal(task)
if err != nil {
return "", fmt.Errorf("marshal task failed: %w", err)
}
// XADD: 添加消息到 Stream,Redis 自动生成递增的消息 ID
// 消息 ID 格式:时间戳(毫秒)-序列号,保证严格递增(FIFO)
messageID, err := rs.client.XAdd(rs.ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{
"task": string(taskJSON), // 整个任务作为一个 JSON 字段
},
}).Result()
if err != nil {
return "", err
}
return messageID, nil
}
// ReadTasks 从 Stream 读取任务(消费者组模式,FIFO 顺序)
func (rs *RedisStream) ReadTasks(streamName, groupName, consumerName string, count int64, block time.Duration) ([]redis.XMessage, error) {
// XREADGROUP: 从消费者组读取消息
// ">" 表示只读取未被该消费者组消费的新消息
// Redis 按消息 ID 字典序返回,保证 FIFO(先进先出)
streams, err := rs.client.XReadGroup(rs.ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"}, // ">" = 读取新消息(未被消费的)
Count: count, // 批量读取数量
Block: block, // 阻塞等待时间,用来redis是直接返回空还是让消费者等待一定时间看是否有新的消息进来,0代表无限阻塞直到新消息进来,
}).Result()
if err != nil {
if err == redis.Nil {
return nil, nil // 没有新消息
}
return nil, err
}
if len(streams) == 0 || len(streams[0].Messages) == 0 {
return nil, nil
}
// 返回的消息已按消息 ID 排序(FIFO)
return streams[0].Messages, nil
}
// AckTask 确认任务已处理
func (rs *RedisStream) AckTask(streamName, groupName string, messageIDs ...string) error {
// XACK: 确认消息已处理
return rs.client.XAck(rs.ctx, streamName, groupName, messageIDs...).Err()
}
// GetPendingTasks 获取未确认的任务(用于故障恢复)
func (rs *RedisStream) GetPendingTasks(streamName, groupName string) ([]redis.XPendingExt, error) {
// XPENDING: 查询未确认的消息
pending, err := rs.client.XPendingExt(rs.ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
}).Result()
if err != nil {
return nil, err
}
return pending, nil
}
// ClaimPendingTask 认领未确认的任务(从其他消费者)
func (rs *RedisStream) ClaimPendingTask(streamName, groupName, consumerName string, minIdleTime time.Duration, messageIDs ...string) ([]redis.XMessage, error) {
// XCLAIM: 认领其他消费者的未确认消息
messages, err := rs.client.XClaim(rs.ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: consumerName,
MinIdle: minIdleTime,
Messages: messageIDs,
}).Result()
if err != nil {
return nil, err
}
return messages, nil
}
// GetStreamLength 获取 Stream 长度
func (rs *RedisStream) GetStreamLength(streamName string) (int64, error) {
// XLEN: 获取 Stream 中的消息数量
return rs.client.XLen(rs.ctx, streamName).Result()
}
// GetStreamInfo 获取 Stream 信息
func (rs *RedisStream) GetStreamInfo(streamName string) (*redis.XInfoStream, error) {
// XINFO STREAM: 获取 Stream 详细信息
return rs.client.XInfoStream(rs.ctx, streamName).Result()
}
// GetGroupInfo 获取消费者组信息
func (rs *RedisStream) GetGroupInfo(streamName string) ([]redis.XInfoGroup, error) {
// XINFO GROUPS: 获取消费者组信息
return rs.client.XInfoGroups(rs.ctx, streamName).Result()
}
// ParseTask 解析消息为任务
func ParseTask(msg redis.XMessage) (*ScanTask, error) {
// 直接从 JSON 反序列化整个任务
taskJSON, ok := msg.Values["task"].(string)
if !ok {
return nil, fmt.Errorf("task field not found or invalid")
}
var task ScanTask
if err := json.Unmarshal([]byte(taskJSON), &task); err != nil {
return nil, fmt.Errorf("unmarshal task failed: %w", err)
}
return &task, nil
}
// AcquireTaskLock 获取任务分布式锁
func (rs *RedisStream) AcquireTaskLock(messageID string, consumerName string, ttl time.Duration) (bool, error) {
lockKey := fmt.Sprintf("lock:task:%s", messageID)
// 使用 SET 命令配合 NX 和 EX 选项实现分布式锁
// NX: 只在键不存在时设置
// TTL: 设置过期时间
result := rs.client.SetArgs(rs.ctx, lockKey, consumerName, redis.SetArgs{
Mode: "NX", // 只在键不存在时设置
TTL: ttl, // 设置过期时间
})
if err := result.Err(); err != nil {
// redis.Nil 表示键已存在(获取锁失败)
if err == redis.Nil {
return false, nil
}
return false, fmt.Errorf("acquire lock failed: %w", err)
}
// 检查返回值,"OK" 表示成功获取锁
val, err := result.Result()
if err != nil {
if err == redis.Nil {
return false, nil
}
return false, fmt.Errorf("acquire lock failed: %w", err)
}
return val == "OK", nil
}
// ReleaseTaskLock 释放任务分布式锁
func (rs *RedisStream) ReleaseTaskLock(messageID string, consumerName string) error {
lockKey := fmt.Sprintf("lock:task:%s", messageID)
// Lua 脚本确保只有锁的持有者才能释放
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
_, err := rs.client.Eval(rs.ctx, script, []string{lockKey}, consumerName).Result()
return err
}
// RenewTaskLock 续期任务锁(用于长时间运行的任务)
func (rs *RedisStream) RenewTaskLock(messageID string, consumerName string, ttl time.Duration) (bool, error) {
lockKey := fmt.Sprintf("lock:task:%s", messageID)
// Lua 脚本确保只有锁的持有者才能续期
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
`
result, err := rs.client.Eval(rs.ctx, script, []string{lockKey}, consumerName, int(ttl.Seconds())).Result()
if err != nil {
return false, err
}
return result.(int64) == 1, nil
}
// GetConsumerInfo 获取消费者详细信息
func (rs *RedisStream) GetConsumerInfo(streamName, groupName string) ([]redis.XInfoConsumer, error) {
// XINFO CONSUMERS: 获取消费者详细信息
return rs.client.XInfoConsumers(rs.ctx, streamName, groupName).Result()
}
- Producer.go:
package main
import (
"fmt"
"os"
"time"
"Go_ENV/services"
"github.com/google/uuid"
)
func main() {
stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
os.Exit(1)
}
defer stream.Close()
streamName := "scan:tasks"
groupName := "scanners"
// 创建消费者组
if err := stream.CreateConsumerGroup(streamName, groupName); err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer group: %v\n", err)
os.Exit(1)
}
fmt.Println("=== Redis Stream Producer (FIFO Demo) ===\n")
// 创建测试任务
tasks := []*services.ScanTask{
{
ID: uuid.New().String(),
Targets: []string{"127.0.0.1:8888"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com", "127.0.0.1:8888"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com"},
TemplateIDs: []string{"local-flask", "multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"127.0.0.1:8888"},
TemplateIDs: []string{"local-flask"},
CreatedAt: time.Now().UnixMilli(),
},
{
ID: uuid.New().String(),
Targets: []string{"www.baidu.com"},
TemplateIDs: []string{"multi-request-check"},
CreatedAt: time.Now().UnixMilli(),
},
}
// 添加任务到 Stream(按顺序添加,验证 FIFO)
fmt.Printf("📤 Adding %d tasks to stream: %s\n", len(tasks), streamName)
fmt.Println(" (Tasks will be consumed in FIFO order by Message ID)\n")
for i, task := range tasks {
// 添加小延迟,确保消息 ID 不同
if i > 0 {
time.Sleep(10 * time.Millisecond)
}
messageID, err := stream.AddTask(streamName, task)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to add task: %v\n", err)
continue
}
fmt.Printf("✓ Task %d: ID=%s, Targets=%v\n → Message ID: %s\n",
i+1, task.ID[:8], task.Targets, messageID)
}
// 显示 Stream 信息
length, _ := stream.GetStreamLength(streamName)
fmt.Printf("\n📊 Stream length: %d messages\n", length)
info, _ := stream.GetStreamInfo(streamName)
fmt.Printf("📊 First entry ID: %s (oldest, will be consumed first)\n", info.FirstEntry.ID)
fmt.Printf("📊 Last entry ID: %s (newest, will be consumed last)\n", info.LastEntry.ID)
groups, _ := stream.GetGroupInfo(streamName)
for _, group := range groups {
fmt.Printf("📊 Group: %s, Pending: %d, Consumers: %d\n",
group.Name, group.Pending, group.Consumers)
}
fmt.Println("\n✅ All tasks added in FIFO order!")
fmt.Println("💡 Run consumer to see tasks consumed in the same order")
}
- Consumer.go:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"Go_ENV/services"
nuclei "github.com/projectdiscovery/nuclei/v3/lib"
"github.com/projectdiscovery/nuclei/v3/pkg/catalog/disk"
"github.com/projectdiscovery/nuclei/v3/pkg/output"
"github.com/redis/go-redis/v9"
)
func main() {
stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
os.Exit(1)
}
defer stream.Close()
streamName := "scan:tasks"
groupName := "scanners"
consumerName := fmt.Sprintf("consumer-%d", time.Now().Unix()) //消费者名称
// 锁配置
const (
lockTTL = 6 * time.Minute // 锁的过期时间:6 分钟
lockRenewTime = 5 * time.Minute // 锁的续期间隔:5 分钟
)
fmt.Printf("=== Redis Stream Consumer (FIFO Mode with Distributed Lock) ===\n")
fmt.Printf("Consumer: %s\n", consumerName)
fmt.Printf("Max concurrent: 3\n")
fmt.Printf("Lock TTL: %v (自动续期间隔: %v)\n", lockTTL, lockRenewTime)
fmt.Printf("💡 Tasks will be consumed in FIFO order with distributed lock protection\n")
fmt.Printf("💡 Priority: PEL tasks → New tasks\n\n")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 优雅退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("\n🛑 Shutting down consumer...")
cancel()
}()
// 并发控制:最多 3 个任务同时运行
semaphore := make(chan struct{}, 3)
var wg sync.WaitGroup
// 主循环:持续消费任务
fmt.Println("🔄 Starting task consumption loop...")
for {
select {
case <-ctx.Done():
wg.Wait()
fmt.Println("✅ Consumer stopped")
return
default:
// 等待信号量可用(阻塞点)
semaphore <- struct{}{}
// 优先处理 PEL 中的任务
processed := false
pending, err := stream.GetPendingTasks(streamName, groupName)
if err == nil && len(pending) > 0 {
// 尝试认领并处理第一个超时的任务
for _, p := range pending {
// 认领超过 10 秒未确认的任务
if p.Idle < 10*time.Second {
continue
}
// 先尝试获取锁,如果获取失败说明任务正在被处理
locked, err := stream.AcquireTaskLock(p.ID, consumerName, lockTTL)
if err != nil {
fmt.Printf("[%s] ⚠️ PEL Task [%s] 检查锁失败: %v\n",
time.Now().Format("15:04:05.000"), p.ID[:8], err)
continue
}
if !locked {
// 锁已被其他消费者持有,说明任务正在被处理,跳过
fmt.Printf("[%s] ⏭️ PEL Task [%s] 已被其他消费者锁定,跳过\n",
time.Now().Format("15:04:05.000"), p.ID[:8])
continue
}
// 成功获取锁,说明任务未被处理,可以认领
fmt.Printf("[%s] 🔒 PEL Task [%s] 获取锁成功,准备认领\n",
time.Now().Format("15:04:05.000"), p.ID[:8])
// 认领任务
messages, err := stream.ClaimPendingTask(streamName, groupName, consumerName, 10*time.Second, p.ID)
if err != nil || len(messages) == 0 {
// 认领失败,释放锁
stream.ReleaseTaskLock(p.ID, consumerName)
fmt.Printf("[%s] ❌ PEL Task [%s] 认领失败: %v\n",
time.Now().Format("15:04:05.000"), p.ID[:8], err)
continue
}
msg := messages[0]
wg.Add(1)
// 注意:锁已经获取,在 processTaskWithLock 中不需要再次获取
go processTaskWithLockAcquired(ctx, stream, streamName, groupName, consumerName, msg, &wg, semaphore, true, lockTTL, lockRenewTime)
processed = true
break
}
}
// 如果没有处理 PEL 任务,则拉取新任务
if !processed {
messages, err := stream.ReadTasks(streamName, groupName, consumerName, 1, 1*time.Second)
if err != nil {
fmt.Fprintf(os.Stderr, "Read error: %v\n", err)
<-semaphore // 释放信号量
time.Sleep(100 * time.Millisecond)
continue
}
if messages == nil || len(messages) == 0 {
<-semaphore // 释放信号量
continue
}
msg := messages[0]
wg.Add(1)
go processTaskWithLock(ctx, stream, streamName, groupName, consumerName, msg, &wg, semaphore, false, lockTTL, lockRenewTime)
}
}
}
}
// processTaskWithLock 处理任务(带分布式锁)
func processTaskWithLock(ctx context.Context, stream *services.RedisStream, streamName, groupName, consumerName string,
msg redis.XMessage, wg *sync.WaitGroup, semaphore chan struct{}, isRecovered bool, lockTTL, lockRenewTime time.Duration) {
startTime := time.Now()
defer func() {
duration := time.Since(startTime)
<-semaphore
fmt.Printf("[%s] 🔓 Semaphore released (took %v)\n",
time.Now().Format("15:04:05.000"), duration)
wg.Done()
}()
task, err := services.ParseTask(msg)
if err != nil {
fmt.Printf("❌ Parse error: %v\n", err)
return
}
// 尝试获取分布式锁
locked, err := stream.AcquireTaskLock(msg.ID, consumerName, lockTTL)
if err != nil {
fmt.Printf("[%s] ❌ Task [%s] Failed to acquire lock: %v\n",
time.Now().Format("15:04:05.000"), task.ID[:8], err)
return
}
if !locked {
// 锁已被其他消费者持有,跳过此任务
fmt.Printf("[%s] ⏭️ Task [%s] Already locked by another consumer, skipping\n",
time.Now().Format("15:04:05.000"), task.ID[:8])
return
}
// 成功获取锁,确保最后释放
defer func() {
if err := stream.ReleaseTaskLock(msg.ID, consumerName); err != nil {
fmt.Printf("[%s] ⚠️ Task [%s] Failed to release lock: %v\n",
time.Now().Format("15:04:05.000"), task.ID[:8], err)
}
}()
processTaskCore(ctx, stream, streamName, groupName, consumerName, msg, task, lockTTL, lockRenewTime, isRecovered)
}
// processTaskWithLockAcquired 处理任务(锁已获取)
func processTaskWithLockAcquired(ctx context.Context, stream *services.RedisStream, streamName, groupName, consumerName string,
msg redis.XMessage, wg *sync.WaitGroup, semaphore chan struct{}, isRecovered bool, lockTTL, lockRenewTime time.Duration) {
startTime := time.Now()
defer func() {
duration := time.Since(startTime)
<-semaphore
fmt.Printf("[%s] 🔓 Semaphore released (took %v)\n",
time.Now().Format("15:04:05.000"), duration)
wg.Done()
}()
task, err := services.ParseTask(msg)
if err != nil {
fmt.Printf("❌ Parse error: %v\n", err)
// 解析失败,释放锁
stream.ReleaseTaskLock(msg.ID, consumerName)
return
}
// 锁已经在外部获取,确保最后释放
defer func() {
if err := stream.ReleaseTaskLock(msg.ID, consumerName); err != nil {
fmt.Printf("[%s] ⚠️ Task [%s] Failed to release lock: %v\n",
time.Now().Format("15:04:05.000"), task.ID[:8], err)
}
}()
processTaskCore(ctx, stream, streamName, groupName, consumerName, msg, task, lockTTL, lockRenewTime, isRecovered)
}
// processTaskCore 任务处理核心逻辑
func processTaskCore(ctx context.Context, stream *services.RedisStream, streamName, groupName, consumerName string,
msg redis.XMessage, task *services.ScanTask, lockTTL, lockRenewTime time.Duration, isRecovered bool) {
recoveredTag := ""
if isRecovered {
recoveredTag = " [RECOVERED]"
}
fmt.Printf("\n[%s] 🔒 Task [%s]%s Lock acquired, starting scan\n",
time.Now().Format("15:04:05.000"), task.ID[:8], recoveredTag)
fmt.Printf(" Message ID: %s\n", msg.ID)
fmt.Printf(" Targets: %v\n", task.Targets)
fmt.Printf(" Templates: %v\n", task.TemplateIDs)
fmt.Printf(" Lock TTL: %v (续期间隔: %v)\n", lockTTL, lockRenewTime)
// 启动锁续期 goroutine
renewCtx, cancelRenew := context.WithCancel(ctx)
defer cancelRenew()
go func() {
ticker := time.NewTicker(lockRenewTime)
defer ticker.Stop()
renewCount := 0
for {
select {
case <-renewCtx.Done():
if renewCount > 0 {
fmt.Printf("[%s] 🛑 Task [%s] 停止锁续期(共续期 %d 次)\n",
time.Now().Format("15:04:05.000"), task.ID[:8], renewCount)
}
return
case <-ticker.C:
renewed, err := stream.RenewTaskLock(msg.ID, consumerName, lockTTL)
if err != nil || !renewed {
fmt.Printf("[%s] ⚠️ Task [%s] 锁续期失败: %v\n",
time.Now().Format("15:04:05.000"), task.ID[:8], err)
} else {
renewCount++
fmt.Printf("[%s] 🔄 Task [%s] 锁续期成功(第 %d 次,TTL 重置为 %v)\n",
time.Now().Format("15:04:05.000"), task.ID[:8], renewCount, lockTTL)
}
}
}
}()
// 执行 Nuclei 扫描
if err := runNucleiScan(ctx, task); err != nil {
fmt.Printf("[%s] ❌ Task [%s] Scan failed: %v (will retry)\n",
time.Now().Format("15:04:05.000"), task.ID[:8], err)
// 不发送 ACK,任务会被重新分配
return
}
// 任务成功,发送 ACK
if err := stream.AckTask(streamName, groupName, msg.ID); err != nil {
fmt.Printf("❌ ACK error: %v\n", err)
return
}
fmt.Printf("[%s] ✅ Task [%s] Completed and ACKed\n",
time.Now().Format("15:04:05.000"), task.ID[:8])
}
// recoverPendingTasks 恢复未确认的任务(故障恢复)- 已废弃,逻辑合并到主循环
// func recoverPendingTasks(stream *services.RedisStream, streamName, groupName, consumerName string, wg *sync.WaitGroup, semaphore chan struct{}) {
// // 此函数已废弃,PEL 任务处理已合并到主循环中
// }
// runNucleiScan 执行 Nuclei 扫描
func runNucleiScan(ctx context.Context, task *services.ScanTask) error {
if len(task.Targets) == 0 {
return fmt.Errorf("targets is empty")
}
// 获取模板目录
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("get home dir failed: %w", err)
}
templateDir := filepath.Join(home, "nuclei-templates")
// 配置 Nuclei 选项
opts := []nuclei.NucleiSDKOptions{
nuclei.WithCatalog(disk.NewCatalog(templateDir)),
nuclei.DisableUpdateCheck(),
}
// 如果指定了模板 ID,添加过滤器
if len(task.TemplateIDs) > 0 {
opts = append(opts, nuclei.WithTemplateFilters(nuclei.TemplateFilters{
IDs: task.TemplateIDs,
}))
}
// 创建 Nuclei 引擎
engine, err := nuclei.NewNucleiEngineCtx(ctx, opts...)
if err != nil {
return fmt.Errorf("create nuclei engine failed: %w", err)
}
defer engine.Close()
// 加载模板
if err := engine.LoadAllTemplates(); err != nil {
return fmt.Errorf("load templates failed: %w", err)
}
// 加载目标
engine.LoadTargets(task.Targets, true)
// 执行扫描并处理结果
fmt.Printf(" ⏳ Task [%s] Scanning...\n", task.ID[:8])
return engine.ExecuteCallbackWithCtx(ctx, func(ev *output.ResultEvent) {
if ev == nil || !ev.MatcherStatus {
return
}
severity := ev.Info.SeverityHolder.Severity.String()
if severity == "" {
severity = "unknown"
}
fmt.Printf(" [VULN] Task [%s] severity=%s template=%s name=%s matched=%s\n",
task.ID[:8],
severity,
ev.TemplateID,
ev.Info.Name,
ev.Matched,
)
})
}
随后先运行producer.go文件:
=== Redis Stream Producer (FIFO Demo) ===
📤 Adding 5 tasks to stream: scan:tasks
(Tasks will be consumed in FIFO order by Message ID)
✓ Task 1: ID=ba37e680, Targets=[127.0.0.1:8888]
→ Message ID: 1777369149831-0
✓ Task 2: ID=9df0c69e, Targets=[www.baidu.com 127.0.0.1:8888]
→ Message ID: 1777369149845-0
✓ Task 3: ID=8858fa11, Targets=[www.baidu.com]
→ Message ID: 1777369149857-0
✓ Task 4: ID=9ce2703a, Targets=[127.0.0.1:8888]
→ Message ID: 1777369149869-0
✓ Task 5: ID=1513defc, Targets=[www.baidu.com]
→ Message ID: 1777369149880-0
📊 Stream length: 5 messages
📊 First entry ID: 1777369149831-0 (oldest, will be consumed first)
📊 Last entry ID: 1777369149880-0 (newest, will be consumed last)
📊 Group: scanners, Pending: 0, Consumers: 0
✅ All tasks added in FIFO order!
💡 Run consumer to see tasks consumed in the same order
随后运行consumer.go文件即可:
=== Redis Stream Consumer (FIFO Mode with Distributed Lock) ===
Consumer: consumer-1777369488
Max concurrent: 3
Lock TTL: 6m0s (自动续期间隔: 5m0s)
💡 Tasks will be consumed in FIFO order with distributed lock protection
💡 Priority: PEL tasks → New tasks
🔄 Starting task consumption loop...
[17:44:48.721] 🔒 Task [3c4cf862] Lock acquired, starting scan
Message ID: 1777369458950-0
Targets: [www.baidu.com]
Templates: [local-flask multi-request-check]
Lock TTL: 6m0s (续期间隔: 5m0s)
[17:44:48.722] 🔒 Task [8ad74325] Lock acquired, starting scan
Message ID: 1777369458938-0
Targets: [www.baidu.com 127.0.0.1:8888]
Templates: [local-flask multi-request-check]
Lock TTL: 6m0s (续期间隔: 5m0s)
[17:44:48.722] 🔒 Task [07ff4871] Lock acquired, starting scan
Message ID: 1777369458927-0
Targets: [127.0.0.1:8888]
Templates: [local-flask multi-request-check]
Lock TTL: 6m0s (续期间隔: 5m0s)
⏳ Task [07ff4871] Scanning...
[VULN] Task [07ff4871] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
[17:44:49.931] ✅ Task [07ff4871] Completed and ACKed
[17:44:49.934] 🔓 Semaphore released (took 1.21409825s)
[17:44:49.934] 🔒 Task [e02178f0] Lock acquired, starting scan
Message ID: 1777369458961-0
Targets: [127.0.0.1:8888]
Templates: [local-flask]
Lock TTL: 6m0s (续期间隔: 5m0s)
⏳ Task [e02178f0] Scanning...
[VULN] Task [e02178f0] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
[17:44:50.054] ✅ Task [e02178f0] Completed and ACKed
[17:44:50.054] 🔓 Semaphore released (took 119.452541ms)
[17:44:50.054] 🔒 Task [f89f35ca] Lock acquired, starting scan
Message ID: 1777369458971-0
Targets: [www.baidu.com]
Templates: [multi-request-check]
Lock TTL: 6m0s (续期间隔: 5m0s)
⏳ Task [8ad74325] Scanning...
[VULN] Task [8ad74325] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
⏳ Task [f89f35ca] Scanning...
[VULN] Task [8ad74325] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[17:44:50.633] ✅ Task [8ad74325] Completed and ACKed
[17:44:50.633] 🔓 Semaphore released (took 1.913249584s)
[VULN] Task [f89f35ca] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[17:44:50.871] ✅ Task [f89f35ca] Completed and ACKed
[17:44:50.871] 🔓 Semaphore released (took 817.130709ms)
⏳ Task [3c4cf862] Scanning...
[VULN] Task [3c4cf862] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[17:44:51.607] ✅ Task [3c4cf862] Completed and ACKed
[17:44:51.607] 🔓 Semaphore released (took 2.887172375s)
代码实现细则以及部分知识点:
- 这里消费者的大概实现过程如下:

-
分布式锁和redis stream是不同的概念,就算你给消息加了分布式锁,但还是会被拿到另一个消费者的pel中,因为redis分布式锁其实本质就是一个键值对的功能性拓展,通过在redis数据库中设置一个唯一性、有过期时间的键值对,并且在应用层中对消息进行处理前,需要先看是否对这个消息加了锁,若能成功加锁,就代表这个消息没有正在被处理,但是这样做的话就必须保证键的唯一性,也就是新加的锁要和原本加的锁的键要一样,防止对同一条消息加多个锁,如何实现呢?解决思路就是通过messageID作为唯一性的标配,原因如下:
- 最关键的就是redis stream的消息链表里面的messageID是时间戳-序号,可以很好地保证messageID的唯一性
- 其次在redis stream中,对应消息的相关messageID是不会变化的,就算在不同消费者的pel中,其messageID都是一样的,可以基于这个来对这个消息的锁作出定义。
-
其他就没啥好说的了,实现了前面说的在运行时每隔五分钟重置锁的过期时间为6分钟,并且在运行结束后有效删除锁等内容,看一下代码就懂了。
redis的令牌桶算法
令牌桶算法(Token Bucket Algorithm),个人实现的dast中非常重要的一部分,用来做qps限制,以代码形式表示令牌桶的结果大概如下:
type TokenBucket struct {
capacity int64 // 桶最大容量
tokens float64 // 当前令牌数量
rate float64 // 每秒生成令牌速度
lastRefillTime int64 // 上次补充时间
}
这四个变量就可以用于描述整个限流状态。此算法实现的大致原理是:每次请求到达时,都会根据当前时间-上次补充时间(lastRefillTime),结合rate来进行计算需要更新的令牌并补充(但不会超过capacity),然后如果桶中的令牌足够,就会允许请求进行下去,否则就拒绝/让请求等待token补充,流程大致如下:
sequenceDiagram
participant Client as 请求
participant Bucket as TokenBucket
Client->>Bucket: Allow()
Bucket->>Bucket: 获取当前时间 now
Bucket->>Bucket: now - lastRefillTime
Bucket->>Bucket: 计算新增 token
Bucket->>Bucket: 更新 tokens
Bucket->>Bucket: 判断 token 是否足够
alt token 足够
Bucket->>Bucket: 扣减 token
Bucket-->>Client: 放行
else token 不够
Bucket-->>Client: 拒绝
end
Bucket->>Bucket: 更新 lastRefillTime
注意这里并不是定时器那种定时往桶里面加令牌,这样设置可以让整体的token添加更合理,如果是定时器那种,长时间未运行任务,计时器就会一直尝试加令牌,即使令牌桶已经满了,消耗服务器资源,不如通过令牌桶算法来在每次的实际请求中按需推导状态并按需添加令牌。具体计算方式是通过lua脚本实现的,这里就不多说了。
————————
基本概念如上,在dast的架构构建中,考虑下来,还是要不同模块设置不同的qps,因为如果只对一个策略做整体的qps限制,由于每个模块的扫描方式不同,无法做到兼顾,还是每个模块做各自的qps限制。那么在实际设置中,需要考虑到如下问题:
- 考虑到架构设计,整体扫描是流水线形式,而qps肯定是针对不同的任务里面的目标的,故不能只在模块流水线里面设置通用的qps,需要针对每个
在后续了解中,发现可以直接通过golang.org/x/time/rate这个库来解决,直接在代码中针对单ip进行速率限制即可,redis相关的令牌桶算法更适合全局下的限制,和我dast架构设计不适很符合,这里暂时不涉及。
缓存?
后续看需不需要详细看看