fupanc's blog

go-redis

go-redis是Go编程语言的官方Redis客户端库,通过提供简单易用的接口来实现与redis服务器的交互。而redis在很多开发场景中都能起到重要作用,这里借助这个库来了解一下redis的相关用法。

go-redis库下载方式:

go get github.com/redis/go-redis/v9

——————————

另外需要注意这个sdk只是可以通过代码来操作redis数据库,故环境中需要存在对应的数据库,macos的下载方式如下:

brew install redis #下载
brew services start redis #开机自动运行,会作为后台常驻服务
brew services stop redis #停止服务
brew services restart redis #重启服务
brew services list #查看服务状态

按需使用命令即可。

简单了解

Redis常见数据类型:

等数据类型,这里就简单看看如上几个就行,后续使用中也许会用到。

后续通过代码来学习go如何操作redis以及redis的一些使用场景。如下string类型的代码

package main

import (
	"context"
	"fmt"
	"github.com/redis/go-redis/v9"
	"time"
)

var ctx = context.Background() //让同一批次的操作都在同一个上下文中,方便管理

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "redis",
		DB:       0, //使用默认 DB
	})
	defer rdb.Close()

	//string类型键值对基础代码实例:
	err := rdb.Set(ctx, "key", "value", 0).Err() //0代表永久不过期
	if err != nil {
		panic(err)
	}
	val, err := rdb.Get(ctx, "key").Result()
	if err != nil {
		panic(err)
	}
	fmt.Println("放入的key:", val)
	//删除操作
	err = rdb.Del(ctx, "key").Err()
	if err != nil {
		panic(err)
	}
	val, err = rdb.Get(ctx, "key").Result()
	if err == redis.Nil {
		fmt.Println("成功删除key")
	} else {
		panic(err)
	}
	fmt.Println("删除后的key:", val)

	//设置过期时间:
	err = rdb.Set(ctx, "key1", "value1", 5*time.Second).Err() //设置5秒的延时
	if err != nil {
		panic(err)
	}
	val, err = rdb.Get(ctx, "key1").Result()
	if err != nil {
		panic(err)
	}
	fmt.Println("放入的key1:", val)
	//延时5秒
	time.Sleep(5 * time.Second)
	val, err = rdb.Get(ctx, "key1").Result()
	if err == redis.Nil {
		fmt.Println("key1已被删除")
	} else {
		panic(err)
	}
	fmt.Println("key1的值:", val)
}

效果如下:

放入的key: value
成功删除key
删除后的key: 
放入的key1: value1
key1已被删除
key1的值: 

代码简单实现了连接redis并且往里面放入以及取出键值对。

hash存储的代码

package main

import (
	"context"
	"fmt"
	"github.com/redis/go-redis/v9"
)

var ctx = context.Background() //让同一批次的操作都在同一个上下文中,方便管理

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "redis",
		DB:       0, //使用默认 DB
	})
	defer rdb.Close()

	//操作redis中的hash类型:
	err := rdb.HSet(ctx, "mykey", "myval", "hello").Err()
	if err != nil {
		panic(err)
	}
	err = rdb.HSet(ctx, "mykey", "myval1", "hello1").Err()
	if err != nil {
		panic(err)
	}

	val, err := rdb.HGet(ctx, "mykey", "myval").Result()
	if err != nil {
		panic(err)
	}
	val1, err := rdb.HGet(ctx, "mykey", "myval1").Result()
	if err != nil {
		panic(err)
	}
	fmt.Println("mykey分类下的值:", val, val1)
}

运行效果:

mykey分类下的值: hello hello1

简单来说就是实现了一种分类的操作,可以看成是在mykey这个分类下存放了几个键值对的集合。如下形象的分类:

Key: task99
Fields:
  - target: "example.com"
  - plugin: "xss_detector"
  - status: "running"
  - start_time: "2026-04-21"

以集合的方式方便存储不同需求的数据,有利于对redis中存储的数据的结构的理解以及修改。

功能场景

redis在实际使用中的具体功能特点,下述主要是我在dast中可能用到的功能,看看如下场景中利用到了redis的什么功能特点。

消息队列

预期实现场景:管理已下发的任务的进行,采用基于队列的先进先出策略,基于redis的List数据类型实现扫描任务按顺序下发。

这里就直接使用nuclei扫描模块的demo来作为任务下发,然后再通过redis来实现任务管理。就是生产者消费者问题,通过信号量机制来实现对任务并发的控制,可以通过两种方式来实现任务队列:

下面分别讲一下要如何实现。

List类型

列表类型的数据天生适合用于实现消息队列,并且redis还提供了一些命令用于保证需求的实现,这里我们主要是通过go-redis库来操作redis,而go-redis库提供了 LPUSH 从左边推入数据,使用 BRPOP 从右边弹出数据,满足实现先进先出的策略。

实现代码如下:

package services

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	goredis "github.com/redis/go-redis/v9"
)

// ScanTask 扫描任务结构
type ScanTask struct {
	ID          string   `json:"id"`           // 任务唯一ID
	Targets     []string `json:"targets"`      // 扫描目标列表
	TemplateIDs []string `json:"template_ids"` // 模板ID列表
	CreatedAt   int64    `json:"created_at"`   // 创建时间戳
}

// RedisQueue Redis 队列管理器(FIFO 先进先出)
type RedisQueue struct {
	client *goredis.Client
	ctx    context.Context
}

// NewRedisQueue 创建 Redis 队列管理器
func NewRedisQueue(addr, password string, db int) (*RedisQueue, error) {
	client := goredis.NewClient(&goredis.Options{
		Addr:         addr,
		Password:     password,
		DB:           db,
		DialTimeout:  5 * time.Second,
		ReadTimeout:  3 * time.Second,
		WriteTimeout: 3 * time.Second,
	})

	ctx := context.Background()
	if err := client.Ping(ctx).Err(); err != nil {
		return nil, fmt.Errorf("redis connection failed: %w", err)
	}

	return &RedisQueue{
		client: client,
		ctx:    ctx,
	}, nil
}

// Close 关闭 Redis 连接
func (rq *RedisQueue) Close() error {
	return rq.client.Close()
}

// PushTask 将任务推送到队列(FIFO:先进先出)
// 使用 LPUSH 从左边推入,配合 BRPOP 从右边弹出,实现先进先出
func (rq *RedisQueue) PushTask(queueName string, task *ScanTask) error {
	taskJSON, err := json.Marshal(task)
	if err != nil {
		return fmt.Errorf("marshal task failed: %w", err)
	}

	// LPUSH: 从列表左侧插入
	if err := rq.client.LPush(rq.ctx, queueName, taskJSON).Err(); err != nil {
		return fmt.Errorf("push task to queue failed: %w", err)
	}

	fmt.Printf("[Producer] Task %s pushed to queue: %s\n", task.ID, queueName)
	return nil
}

// PopTask 从队列中弹出任务(阻塞式,FIFO)
// 使用 BRPOP 从右边弹出,如果队列为空则阻塞等待
func (rq *RedisQueue) PopTask(queueName string, timeout time.Duration) (*ScanTask, error) {
	// BRPOP: 从列表右侧弹出,阻塞等待直到有数据或超时
	result, err := rq.client.BRPop(rq.ctx, timeout, queueName).Result()
	if err != nil {
		if err == goredis.Nil {
			return nil, nil // 超时,没有任务
		}
		return nil, fmt.Errorf("pop task from queue failed: %w", err)
	}

	// result[0] 是队列名,result[1] 是数据
	var task ScanTask
	if err := json.Unmarshal([]byte(result[1]), &task); err != nil {
		return nil, fmt.Errorf("unmarshal task failed: %w", err)
	}

	fmt.Printf("[Consumer] Task %s popped from queue: %s\n", task.ID, queueName)
	return &task, nil
}

// GetQueueLength 获取队列长度
func (rq *RedisQueue) GetQueueLength(queueName string) (int64, error) {
	length, err := rq.client.LLen(rq.ctx, queueName).Result()
	if err != nil {
		return 0, fmt.Errorf("get queue length failed: %w", err)
	}
	return length, nil
}

// ClearQueue 清空队列
func (rq *RedisQueue) ClearQueue(queueName string) error {
	if err := rq.client.Del(rq.ctx, queueName).Err(); err != nil {
		return fmt.Errorf("clear queue failed: %w", err)
	}
	fmt.Printf("[Manager] Queue cleared: %s\n", queueName)
	return nil
}
package main

import (
	"fmt"
	"os"
	"time"

	"Go_ENV/services"

	"github.com/google/uuid"
)

// 生产者示例:将扫描任务推送到 Redis 队列(FIFO 先进先出)
func main() {
	// 1. 连接 Redis
	queue, err := services.NewRedisQueue("localhost:6379", "redis", 0)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
		os.Exit(1)
	}
	defer queue.Close()

	fmt.Println("=== Redis FIFO Queue Producer ===\n")

	// 2. 创建扫描任务
	tasks := []*services.ScanTask{
		{
			ID:          uuid.New().String(),
			Targets:     []string{"127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().Unix(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com", "127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().Unix(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().Unix(),
		},
	}

	// 3. 推送任务到队列(先进先出)
	queueName := "nuclei:scan:queue"
	fmt.Printf("Pushing %d tasks to queue: %s\n\n", len(tasks), queueName)

	for i, task := range tasks {
		if err := queue.PushTask(queueName, task); err != nil {
			fmt.Fprintf(os.Stderr, "Failed to push task: %v\n", err)
			continue
		}
		fmt.Printf("✓ Task %d: %s (targets: %v)\n", i+1, task.ID, task.Targets)
	}

	// 4. 查看队列长度
	length, err := queue.GetQueueLength(queueName)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to get queue length: %v\n", err)
	} else {
		fmt.Printf("\n📊 Current queue length: %d\n", length)
	}

	fmt.Println("\n✅ All tasks pushed successfully!")
	fmt.Println("💡 Run consumer.go to process these tasks")
}
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"path/filepath"
	"sync"
	"syscall"
	"time"

	"Go_ENV/services"

	nuclei "github.com/projectdiscovery/nuclei/v3/lib"
	"github.com/projectdiscovery/nuclei/v3/pkg/catalog/disk"
	"github.com/projectdiscovery/nuclei/v3/pkg/output"
)

// 批处理模式消费者:处理完所有任务后自动退出
func main() {
	// 1. 连接 Redis
	queue, err := services.NewRedisQueue("localhost:6379", "redis", 0)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
		os.Exit(1)
	}
	defer queue.Close()

	fmt.Println("=== Redis Queue Consumer (Batch Mode) ===")
	fmt.Println("Max concurrent tasks: 3")
	fmt.Println("Will exit when all tasks are completed\n")

	// 2. 设置优雅退出
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

	go func() {
		<-sigChan
		fmt.Println("\n\n🛑 Shutting down consumer...")
		cancel()
	}()

	// 3. 创建信号量控制并发数(最多 2 个)
	semaphore := make(chan struct{}, 2)

	// 4. 使用 WaitGroup 等待所有任务完成
	var wg sync.WaitGroup

	// 5. 队列名称
	queueName := "nuclei:scan:queue"

	// 6. 先检查队列长度
	initialLength, err := queue.GetQueueLength(queueName)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to get queue length: %v\n", err)
		os.Exit(1)
	}

	if initialLength == 0 {
		fmt.Println("📭 Queue is empty, nothing to process")
		return
	}

	fmt.Printf("📊 Found %d tasks in queue\n\n", initialLength)

	// 7. 处理任务的循环
	emptyCount := 0
	maxEmptyCount := 3 // 连续 3 次为空就认为队列已清空

	for {
		select {
		case <-ctx.Done():
			fmt.Println("✅ Consumer stopped by signal")
			wg.Wait() // 等待所有正在运行的任务完成
			return
		default:
			// 从队列中获取任务(短超时,避免长时间阻塞)
			task, err := queue.PopTask(queueName, 1*time.Second)
			if err != nil {
				fmt.Fprintf(os.Stderr, "❌ Failed to pop task: %v\n", err)
				continue
			}

			if task == nil {
				// 没有任务
				emptyCount++
				fmt.Printf("📭 No tasks available (%d/%d)\n", emptyCount, maxEmptyCount)

				if emptyCount >= maxEmptyCount {
					fmt.Println("\n🎉 All tasks completed! Waiting for running tasks to finish...")
					wg.Wait() // 等待所有正在运行的任务完成
					fmt.Println("✅ All tasks finished, exiting")
					return
				}
				continue
			}

			// 重置空队列计数
			emptyCount = 0

			// 获取信号量(如果已有 2 个任务在运行,这里会阻塞)
			fmt.Printf("\n[%s] 🔄 Trying to acquire semaphore... (current: %d/2)\n", 
				time.Now().Format("15:04:05.000"), len(semaphore))
			semaphore <- struct{}{}
			fmt.Printf("[%s] ✅ Semaphore acquired! Current concurrency: %d/2\n", 
				time.Now().Format("15:04:05.000"), len(semaphore))

			// 增加 WaitGroup 计数
			wg.Add(1)

			// 启动 goroutine 处理任务
			go func(t *services.ScanTask) {
				startTime := time.Now()
				taskIDShort := t.ID[:8] // 只显示前 8 位
				
				defer func() {
					duration := time.Since(startTime)
					<-semaphore // 释放信号量
					fmt.Printf("[%s] 🔓 Semaphore released! Current concurrency: %d/2 (Task %s took %v)\n", 
						time.Now().Format("15:04:05.000"), len(semaphore), taskIDShort, duration)
					wg.Done()   // 减少 WaitGroup 计数
				}()

				fmt.Printf("\n[%s] 🔍 [%s] Starting scan\n", time.Now().Format("15:04:05.000"), taskIDShort)
				fmt.Printf("   Targets: %v\n", t.Targets)
				fmt.Printf("   Templates: %v\n", t.TemplateIDs)
				fmt.Printf("   Created: %s\n", time.Unix(t.CreatedAt, 0).Format("2006-01-02 15:04:05"))

				// 执行 Nuclei 扫描
				if err := runNucleiScan(ctx, t); err != nil {
					fmt.Fprintf(os.Stderr, "[%s] ❌  [%s] Scan failed: %v\n", 
						time.Now().Format("15:04:05.000"), taskIDShort, err)
				} else {
					fmt.Printf("[%s] ✅  [%s] Scan completed\n", 
						time.Now().Format("15:04:05.000"), taskIDShort)
				}
			}(task)
		}
	}
}

// runNucleiScan 执行 Nuclei 扫描任务
func runNucleiScan(ctx context.Context, task *services.ScanTask) error {
	if len(task.Targets) == 0 {
		return fmt.Errorf("targets is empty")
	}

	// 获取模板目录
	home, err := os.UserHomeDir()
	if err != nil {
		return fmt.Errorf("get home dir failed: %w", err)
	}
	templateDir := filepath.Join(home, "nuclei-templates")

	// 配置 Nuclei 选项
	opts := []nuclei.NucleiSDKOptions{
		nuclei.WithCatalog(disk.NewCatalog(templateDir)),
		nuclei.DisableUpdateCheck(),
	}

	// 如果指定了模板 ID,添加过滤器
	if len(task.TemplateIDs) > 0 {
		opts = append(opts, nuclei.WithTemplateFilters(nuclei.TemplateFilters{
			IDs: task.TemplateIDs,
		}))
	}

	// 创建 Nuclei 引擎
	engine, err := nuclei.NewNucleiEngineCtx(ctx, opts...)
	if err != nil {
		return fmt.Errorf("create nuclei engine failed: %w", err)
	}
	defer engine.Close()

	// 加载模板
	if err := engine.LoadAllTemplates(); err != nil {
		return fmt.Errorf("load templates failed: %w", err)
	}

	// 加载目标
	engine.LoadTargets(task.Targets, true)

	// 执行扫描并处理结果
	fmt.Printf("   [%s] ⏳ [%s] Scanning...\n", time.Now().Format("15:04:05.000"), task.ID[:8])
	return engine.ExecuteCallbackWithCtx(ctx, func(ev *output.ResultEvent) {
		if ev == nil || !ev.MatcherStatus {
			return
		}

		severity := ev.Info.SeverityHolder.Severity.String()
		if severity == "" {
			severity = "unknown"
		}

		fmt.Printf("   [%s] [VULN] [%s] severity=%s template=%s name=%s matched=%s\n",
			time.Now().Format("15:04:05.000"),
			task.ID[:8],
			severity,
			ev.TemplateID,
			ev.Info.Name,
			ev.Matched,
		)
	})
}

这里的代码设置了需要下发三个任务,然后并发设置成两个,用以模拟对任务下发的控制,最后效果如下:

运行producer.go:

image-20260424174816430

随后可以看一下redis中存储的数据:

image-20260424175347365

成功放入三个待下发任务的task的相关信息。

然后运行consumer.go:

=== Redis Queue Consumer (Batch Mode) ===
Max concurrent tasks: 3
Will exit when all tasks are completed

📊 Found 3 tasks in queue

[Consumer] Task f98fd49a-8c16-4c6e-b1fe-9e5a4246c76a popped from queue: nuclei:scan:queue

[18:23:53.491] 🔍 [f98fd49a] Starting scan
[Consumer] Task d81dba44-34dc-4654-9bb8-518192f51178 popped from queue: nuclei:scan:queue
   Targets: [127.0.0.1:8888]
   Templates: [local-flask multi-request-check]

[18:23:53.491] 🔍 [d81dba44] Starting scan
   Created: 2026-04-24 18:23:30
   Targets: [www.baidu.com 127.0.0.1:8888]
   Templates: [local-flask multi-request-check]
   Created: 2026-04-24 18:23:30
[Consumer] Task 403345d3-09bb-4c44-aa11-7efe729d601c popped from queue: nuclei:scan:queue
   [18:23:55.676] ⏳ [f98fd49a] Scanning...
   [18:23:55.679] [VULN] [f98fd49a] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
[18:23:55.683] ✅  [f98fd49a] Scan completed
[18:23:55.683] 🔓 Semaphore released! Current concurrency: 2/2 (Task f98fd49a took 2.191739208s)

[18:23:55.683] 🔍 [403345d3] Starting scan
   Targets: [www.baidu.com]
   Templates: [local-flask multi-request-check]
   Created: 2026-04-24 18:23:30
   [18:23:55.895] ⏳ [d81dba44] Scanning...
   [18:23:55.899] [VULN] [d81dba44] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
   [18:23:56.021] ⏳ [403345d3] Scanning...
   [18:23:56.408] [VULN] [d81dba44] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[18:23:56.413] ✅  [d81dba44] Scan completed
[18:23:56.414] 🔓 Semaphore released! Current concurrency: 1/2 (Task d81dba44 took 2.922455292s)
   [18:23:56.482] [VULN] [403345d3] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[18:23:56.495] ✅  [403345d3] Scan completed
[18:23:56.495] 🔓 Semaphore released! Current concurrency: 0/2 (Task 403345d3 took 812.515125ms)
📭 No tasks available (1/3)
📭 No tasks available (2/3)
📭 No tasks available (3/3)

🎉 All tasks completed! Waiting for running tasks to finish...
✅ All tasks finished, exiting

观察结果就可以知道成功实现了任务数量的控制,第三个任务必须在第二个任务结束释放信号量后才能下发任务,这里是因为将取任务放在前面的,所以一进入循环就会取任务,然后才是判断信号量是否有空的,当然可以尝试将代码改成优先判断信号量是否有空的,然乎才去取数据,这里不多说。

这里涉及到的知识点:

另外的需要考虑的点:

Redis Stream

Redis Stream是一个仅追加(Append-only)的消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容,用redis的key来区分不同的stream,其具有如下功能:

整体结构大概如下:

image-20260425173752077

其中的关键节点如下:

并且在取消息时,Redis Stream也是严格按照ID递增来取的,故非常符合先进先出策略,由此我们也可以通过redis stream来实现消息队列。

代码实现如下:

package services

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

// ScanTask 扫描任务
type ScanTask struct {
	ID          string
	Targets     []string
	TemplateIDs []string
	CreatedAt   int64
}

// RedisStream Redis Stream 管理器
type RedisStream struct {
	client *redis.Client
	ctx    context.Context
}

// NewRedisStream 创建 Stream 管理器
func NewRedisStream(addr, password string, db int) (*RedisStream, error) {
	client := redis.NewClient(&redis.Options{
		Addr:     addr,
		Password: password,
		DB:       db,
	})

	ctx := context.Background()
	if err := client.Ping(ctx).Err(); err != nil {
		return nil, fmt.Errorf("redis connection failed: %w", err)
	}

	return &RedisStream{client: client, ctx: ctx}, nil
}

func (rs *RedisStream) Close() error {
	return rs.client.Close()
}

// CreateConsumerGroup 创建消费者组
func (rs *RedisStream) CreateConsumerGroup(streamName, groupName string) error {
	// 尝试创建消费者组,如果已存在会返回错误(忽略)
	err := rs.client.XGroupCreateMkStream(rs.ctx, streamName, groupName, "0").Err()
	if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
		return err
	}
	return nil
}

// AddTask 添加任务到 Stream(FIFO:按消息 ID 顺序)
func (rs *RedisStream) AddTask(streamName string, task *ScanTask) (string, error) {
	// 直接将整个任务序列化为 JSON
	taskJSON, err := json.Marshal(task)
	if err != nil {
		return "", fmt.Errorf("marshal task failed: %w", err)
	}

	// XADD: 添加消息到 Stream,Redis 自动生成递增的消息 ID
	// 消息 ID 格式:时间戳(毫秒)-序列号,保证严格递增(FIFO)
	messageID, err := rs.client.XAdd(rs.ctx, &redis.XAddArgs{
		Stream: streamName,
		Values: map[string]interface{}{
			"task": string(taskJSON), // 整个任务作为一个 JSON 字段
		},
	}).Result()

	if err != nil {
		return "", err
	}

	return messageID, nil
}

// ReadTasks 从 Stream 读取任务(消费者组模式,FIFO 顺序)
func (rs *RedisStream) ReadTasks(streamName, groupName, consumerName string, count int64, block time.Duration) ([]redis.XMessage, error) {
	// XREADGROUP: 从消费者组读取消息
	// ">" 表示只读取未被该消费者组消费的新消息
	// Redis 按消息 ID 字典序返回,保证 FIFO(先进先出)
	streams, err := rs.client.XReadGroup(rs.ctx, &redis.XReadGroupArgs{
		Group:    groupName,
		Consumer: consumerName,
		Streams:  []string{streamName, ">"}, // ">" = 读取新消息(未被消费的)
		Count:    count,                     // 批量读取数量
		Block:    block,                     // 阻塞等待时间,用来redis是直接返回空还是让消费者等待一定时间看是否有新的消息进来,0代表无限阻塞直到新消息进来,
	}).Result()

	if err != nil {
		if err == redis.Nil {
			return nil, nil // 没有新消息
		}
		return nil, err
	}

	if len(streams) == 0 || len(streams[0].Messages) == 0 {
		return nil, nil
	}

	// 返回的消息已按消息 ID 排序(FIFO)
	return streams[0].Messages, nil
}

// AckTask 确认任务已处理
func (rs *RedisStream) AckTask(streamName, groupName string, messageIDs ...string) error {
	// XACK: 确认消息已处理
	return rs.client.XAck(rs.ctx, streamName, groupName, messageIDs...).Err()
}

// GetPendingTasks 获取未确认的任务(用于故障恢复)
func (rs *RedisStream) GetPendingTasks(streamName, groupName string) ([]redis.XPendingExt, error) {
	// XPENDING: 查询未确认的消息
	pending, err := rs.client.XPendingExt(rs.ctx, &redis.XPendingExtArgs{
		Stream: streamName,
		Group:  groupName,
		Start:  "-",
		End:    "+",
		Count:  100,
	}).Result()

	if err != nil {
		return nil, err
	}

	return pending, nil
}

// ClaimPendingTask 认领未确认的任务(从其他消费者)
func (rs *RedisStream) ClaimPendingTask(streamName, groupName, consumerName string, minIdleTime time.Duration, messageIDs ...string) ([]redis.XMessage, error) {
	// XCLAIM: 认领其他消费者的未确认消息
	messages, err := rs.client.XClaim(rs.ctx, &redis.XClaimArgs{
		Stream:   streamName,
		Group:    groupName,
		Consumer: consumerName,
		MinIdle:  minIdleTime,
		Messages: messageIDs,
	}).Result()

	if err != nil {
		return nil, err
	}

	return messages, nil
}

// GetStreamLength 获取 Stream 长度
func (rs *RedisStream) GetStreamLength(streamName string) (int64, error) {
	// XLEN: 获取 Stream 中的消息数量
	return rs.client.XLen(rs.ctx, streamName).Result()
}

// GetStreamInfo 获取 Stream 信息
func (rs *RedisStream) GetStreamInfo(streamName string) (*redis.XInfoStream, error) {
	// XINFO STREAM: 获取 Stream 详细信息
	return rs.client.XInfoStream(rs.ctx, streamName).Result()
}

// GetGroupInfo 获取消费者组信息
func (rs *RedisStream) GetGroupInfo(streamName string) ([]redis.XInfoGroup, error) {
	// XINFO GROUPS: 获取消费者组信息
	return rs.client.XInfoGroups(rs.ctx, streamName).Result()
}

// ParseTask 解析消息为任务
func ParseTask(msg redis.XMessage) (*ScanTask, error) {
	// 直接从 JSON 反序列化整个任务
	taskJSON, ok := msg.Values["task"].(string)
	if !ok {
		return nil, fmt.Errorf("task field not found or invalid")
	}

	var task ScanTask
	if err := json.Unmarshal([]byte(taskJSON), &task); err != nil {
		return nil, fmt.Errorf("unmarshal task failed: %w", err)
	}

	return &task, nil
}
package main

import (
	"fmt"
	"os"
	"time"

	"Go_ENV/services"

	"github.com/google/uuid"
)

func main() {
	stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
		os.Exit(1)
	}
	defer stream.Close()

	streamName := "scan:tasks"
	groupName := "scanners"

	// 创建消费者组
	if err := stream.CreateConsumerGroup(streamName, groupName); err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer group: %v\n", err)
		os.Exit(1)
	}

	fmt.Println("=== Redis Stream Producer (FIFO Demo) ===\n")

	// 创建测试任务
	tasks := []*services.ScanTask{
		{
			ID:          uuid.New().String(),
			Targets:     []string{"127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com", "127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com"},
			TemplateIDs: []string{"multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
	}

	// 添加任务到 Stream(按顺序添加,验证 FIFO)
	fmt.Printf("📤 Adding %d tasks to stream: %s\n", len(tasks), streamName)
	fmt.Println("   (Tasks will be consumed in FIFO order by Message ID)\n")

	for i, task := range tasks {
		// 添加小延迟,确保消息 ID 不同
		if i > 0 {
			time.Sleep(10 * time.Millisecond)
		}

		messageID, err := stream.AddTask(streamName, task)
		if err != nil {
			fmt.Fprintf(os.Stderr, "Failed to add task: %v\n", err)
			continue
		}
		fmt.Printf("✓ Task %d: ID=%s, Targets=%v\n   → Message ID: %s\n",
			i+1, task.ID[:8], task.Targets, messageID)
	}

	// 显示 Stream 信息
	length, _ := stream.GetStreamLength(streamName)
	fmt.Printf("\n📊 Stream length: %d messages\n", length)

	info, _ := stream.GetStreamInfo(streamName)
	fmt.Printf("📊 First entry ID: %s (oldest, will be consumed first)\n", info.FirstEntry.ID)
	fmt.Printf("📊 Last entry ID: %s (newest, will be consumed last)\n", info.LastEntry.ID)

	groups, _ := stream.GetGroupInfo(streamName)
	for _, group := range groups {
		fmt.Printf("📊 Group: %s, Pending: %d, Consumers: %d\n",
			group.Name, group.Pending, group.Consumers)
	}

	fmt.Println("\n✅ All tasks added in FIFO order!")
	fmt.Println("💡 Run consumer to see tasks consumed in the same order")
}
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"path/filepath"
	"sync"
	"syscall"
	"time"

	"Go_ENV/services"

	nuclei "github.com/projectdiscovery/nuclei/v3/lib"
	"github.com/projectdiscovery/nuclei/v3/pkg/catalog/disk"
	"github.com/projectdiscovery/nuclei/v3/pkg/output"
	"github.com/redis/go-redis/v9"
)

func main() {
	stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
		os.Exit(1)
	}
	defer stream.Close()

	streamName := "scan:tasks"
	groupName := "scanners"
	consumerName := fmt.Sprintf("consumer-%d", time.Now().Unix()) //消费者名称

	fmt.Printf("=== Redis Stream Consumer (FIFO Mode) ===\n")
	fmt.Printf("Consumer: %s\n", consumerName)
	fmt.Printf("Batch size: 3 (fetch 3 tasks at once)\n")
	fmt.Printf("Max concurrent: 3\n")
	fmt.Printf("💡 Tasks will be consumed in FIFO order (by Message ID)\n\n")

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 优雅退出
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-sigChan
		fmt.Println("\n🛑 Shutting down consumer...")
		cancel()
	}()

	// 并发控制:最多 3 个任务同时运行
	semaphore := make(chan struct{}, 3)
	var wg sync.WaitGroup

	// 先处理 Pending 任务(故障恢复)
	recoverPendingTasks(stream, streamName, groupName, consumerName, &wg, semaphore)

	// 主循环:消费新任务
	fmt.Println("🔄 Waiting for new tasks...")
	for {
		select {
		case <-ctx.Done():
			wg.Wait()
			fmt.Println("✅ Consumer stopped")
			return
		default:
			// 一次拉取 3 条消息(与信号量容量一致)
			messages, err := stream.ReadTasks(streamName, groupName, consumerName, 3, 1*time.Second)
			if err != nil {
				fmt.Fprintf(os.Stderr, "Read error: %v\n", err)
				continue
			}

			if messages == nil {
				continue
			}

			fmt.Printf("\n📦 Fetched %d tasks from stream (FIFO order by Message ID)\n", len(messages))

			// 显示消息 ID 顺序,验证 FIFO
			for i, msg := range messages {
				task, _ := services.ParseTask(msg)
				fmt.Printf("   [%d] Task ID: %s - Message ID: %s\n",
					i+1, task.ID[:8], msg.ID)
			}
			fmt.Println()

			// 处理每个任务(并发控制)
			for i, msg := range messages {
				// 获取信号量(如果已有 3 个任务在运行,这里会阻塞)
				semaphore <- struct{}{}

				wg.Add(1)

				go func(m redis.XMessage, index int) {
					startTime := time.Now()
					defer func() {
						duration := time.Since(startTime)
						<-semaphore
						fmt.Printf("[%s] 🔓 Semaphore released (took %v)\n",
							time.Now().Format("15:04:05.000"), duration)
						wg.Done()
					}()

					task, err := services.ParseTask(m)
					if err != nil {
						fmt.Printf("❌ Parse error: %v\n", err)
						return
					}

					fmt.Printf("\n[%s] 🔍 Task [%s] Starting scan\n",
						time.Now().Format("15:04:05.000"), task.ID[:8])
					fmt.Printf("   Message ID: %s\n", m.ID)
					fmt.Printf("   Targets: %v\n", task.Targets)
					fmt.Printf("   Templates: %v\n", task.TemplateIDs)

					// 执行 Nuclei 扫描
					if err := runNucleiScan(ctx, task); err != nil {
						fmt.Printf("[%s] ❌ Task [%s] Scan failed: %v (will retry)\n",
							time.Now().Format("15:04:05.000"), task.ID[:8], err)
						// 不发送 ACK,任务会被重新分配
						return
					}

					// 任务成功,发送 ACK
					if err := stream.AckTask(streamName, groupName, m.ID); err != nil {
						fmt.Printf("❌ ACK error: %v\n", err)
						return
					}

					fmt.Printf("[%s] ✅ Task [%s] Completed and ACKed\n",
						time.Now().Format("15:04:05.000"), task.ID[:8])
				}(msg, i)
			}
		}
	}
}

// recoverPendingTasks 恢复未确认的任务(故障恢复)
func recoverPendingTasks(stream *services.RedisStream, streamName, groupName, consumerName string, wg *sync.WaitGroup, semaphore chan struct{}) {
	pending, err := stream.GetPendingTasks(streamName, groupName)
	if err != nil {
		fmt.Printf("Failed to get pending tasks: %v\n", err)
		return
	}

	if len(pending) == 0 {
		return
	}

	fmt.Printf("🔄 Found %d pending tasks, recovering...\n", len(pending))

	for _, p := range pending {
		// 认领超过 10 秒未确认的任务
		if p.Idle < 10*time.Second {
			continue
		}

		messages, err := stream.ClaimPendingTask(streamName, groupName, consumerName, 10*time.Second, p.ID)
		if err != nil {
			fmt.Printf("Failed to claim task %s: %v\n", p.ID, err)
			continue
		}

		for _, msg := range messages {
			semaphore <- struct{}{}
			wg.Add(1)

			go func(m redis.XMessage) {
				defer func() {
					<-semaphore
					wg.Done()
				}()

				task, _ := services.ParseTask(m)
				fmt.Printf("🔄 [%s] Recovering task (Message ID: %s)\n", task.ID[:8], m.ID)

				// 重新执行 Nuclei 扫描
				if err := runNucleiScan(context.Background(), task); err != nil {
					fmt.Printf("❌ [%s] Recovered task failed: %v\n", task.ID[:8], err)
					return
				}

				stream.AckTask(streamName, groupName, m.ID)
				fmt.Printf("✅ [%s] Recovered task completed\n", task.ID[:8])
			}(msg)
		}
	}
}

// runNucleiScan 执行 Nuclei 扫描
func runNucleiScan(ctx context.Context, task *services.ScanTask) error {
	if len(task.Targets) == 0 {
		return fmt.Errorf("targets is empty")
	}

	// 获取模板目录
	home, err := os.UserHomeDir()
	if err != nil {
		return fmt.Errorf("get home dir failed: %w", err)
	}
	templateDir := filepath.Join(home, "nuclei-templates")

	// 配置 Nuclei 选项
	opts := []nuclei.NucleiSDKOptions{
		nuclei.WithCatalog(disk.NewCatalog(templateDir)),
		nuclei.DisableUpdateCheck(),
	}

	// 如果指定了模板 ID,添加过滤器
	if len(task.TemplateIDs) > 0 {
		opts = append(opts, nuclei.WithTemplateFilters(nuclei.TemplateFilters{
			IDs: task.TemplateIDs,
		}))
	}

	// 创建 Nuclei 引擎
	engine, err := nuclei.NewNucleiEngineCtx(ctx, opts...)
	if err != nil {
		return fmt.Errorf("create nuclei engine failed: %w", err)
	}
	defer engine.Close()

	// 加载模板
	if err := engine.LoadAllTemplates(); err != nil {
		return fmt.Errorf("load templates failed: %w", err)
	}

	// 加载目标
	engine.LoadTargets(task.Targets, true)

	// 执行扫描并处理结果
	fmt.Printf("   ⏳ Task [%s] Scanning...\n", task.ID[:8])
	return engine.ExecuteCallbackWithCtx(ctx, func(ev *output.ResultEvent) {
		if ev == nil || !ev.MatcherStatus {
			return
		}

		severity := ev.Info.SeverityHolder.Severity.String()
		if severity == "" {
			severity = "unknown"
		}

		fmt.Printf("   [VULN] Task [%s] severity=%s template=%s name=%s matched=%s\n",
			task.ID[:8],
			severity,
			ev.TemplateID,
			ev.Info.Name,
			ev.Matched,
		)
	})
}
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"Go_ENV/services"
)

func main() {
	stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
		os.Exit(1)
	}
	defer stream.Close()

	streamName := "scan:tasks"

	fmt.Println("=== Redis Stream Monitor (FIFO Verification) ===\n")

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

	ticker := time.NewTicker(3 * time.Second)
	defer ticker.Stop()

	// 初始显示
	showStreamInfo(stream, streamName)

	for {
		select {
		case <-sigChan:
			fmt.Println("\n🛑 Shutting down monitor...")
			return
		case <-ticker.C:
			showStreamInfo(stream, streamName)
		}
	}
}

func showStreamInfo(stream *services.RedisStream, streamName string) {
	separator := "============================================================"
	fmt.Println("\n" + separator)
	fmt.Printf("[%s] 📊 Stream Status\n", time.Now().Format("15:04:05"))
	fmt.Println(separator)

	// Stream 信息
	length, _ := stream.GetStreamLength(streamName)
	fmt.Printf("   Stream length: %d messages\n", length)

	info, err := stream.GetStreamInfo(streamName)
	if err == nil {
		fmt.Printf("   First entry ID: %s (oldest, FIFO head)\n", info.FirstEntry.ID)
		fmt.Printf("   Last entry ID: %s (newest, FIFO tail)\n", info.LastEntry.ID)
	}

	// 消费者组信息
	groups, err := stream.GetGroupInfo(streamName)
	if err == nil {
		fmt.Println("\n   Consumer Groups:")
		for _, group := range groups {
			fmt.Printf("   - %s: Pending=%d, Consumers=%d, Lag=%d\n",
				group.Name, group.Pending, group.Consumers, group.Lag)
		}
	}

	// Pending 任务
	pending, err := stream.GetPendingTasks(streamName, "scanners")
	if err == nil && len(pending) > 0 {
		fmt.Printf("\n   ⚠️  Pending tasks: %d\n", len(pending))
		for i, p := range pending {
			if i >= 5 {
				fmt.Printf("   ... and %d more\n", len(pending)-5)
				break
			}
			fmt.Printf("   - %s: Consumer=%s, Idle=%v, Deliveries=%d\n",
				p.ID, p.Consumer, p.Idle, p.RetryCount)
		}
	}

	fmt.Println(separator)
}

运行效果如下:

Producer.go文件:

=== Redis Stream Producer (FIFO Demo) ===

📤 Adding 5 tasks to stream: scan:tasks
   (Tasks will be consumed in FIFO order by Message ID)

✓ Task 1: ID=639f0385, Targets=[127.0.0.1:8888]
   → Message ID: 1777177221155-0
✓ Task 2: ID=b49c1e37, Targets=[www.baidu.com 127.0.0.1:8888]
   → Message ID: 1777177221166-0
✓ Task 3: ID=39f4df6b, Targets=[www.baidu.com]
   → Message ID: 1777177221177-0
✓ Task 4: ID=bac46c3e, Targets=[127.0.0.1:8888]
   → Message ID: 1777177221188-0
✓ Task 5: ID=c7904c34, Targets=[www.baidu.com]
   → Message ID: 1777177221199-0

📊 Stream length: 5 messages
📊 First entry ID: 1777177221155-0 (oldest, will be consumed first)
📊 Last entry ID: 1777177221199-0 (newest, will be consumed last)
📊 Group: scanners, Pending: 0, Consumers: 0

✅ All tasks added in FIFO order!
💡 Run consumer to see tasks consumed in the same order

然后运行监测文件,出是消息如下:

=== Redis Stream Monitor (FIFO Verification) ===


============================================================
[12:21:24] 📊 Stream Status
============================================================
   Stream length: 5 messages
   First entry ID: 1777177221155-0 (oldest, FIFO head)
   Last entry ID: 1777177221199-0 (newest, FIFO tail)

   Consumer Groups:
   - scanners: Pending=0, Consumers=0, Lag=5
============================================================

后续每隔3s就会再次获取消费组相关信息。随后运行消费者文件即可:

=== Redis Stream Consumer (FIFO Mode) ===
Consumer: consumer-1777177803
Batch size: 3 (fetch 3 tasks at once)
Max concurrent: 3
💡 Tasks will be consumed in FIFO order (by Message ID)

🔄 Waiting for new tasks...

📦 Fetched 3 tasks from stream (FIFO order by Message ID)
   [1] Task ID: 639f0385 - Message ID: 1777177221155-0
   [2] Task ID: b49c1e37 - Message ID: 1777177221166-0
   [3] Task ID: 39f4df6b - Message ID: 1777177221177-0


[12:30:03.343] 🔍 Task [639f0385] Starting scan

[12:30:03.343] 🔍 Task [b49c1e37] Starting scan
   Message ID: 1777177221166-0
   Targets: [www.baidu.com 127.0.0.1:8888]
   Templates: [local-flask multi-request-check]

[12:30:03.343] 🔍 Task [39f4df6b] Starting scan
   Message ID: 1777177221155-0
   Targets: [127.0.0.1:8888]
   Templates: [local-flask multi-request-check]
   Message ID: 1777177221177-0
   Targets: [www.baidu.com]
   Templates: [local-flask multi-request-check]

📦 Fetched 2 tasks from stream (FIFO order by Message ID)
   [1] Task ID: bac46c3e - Message ID: 1777177221188-0
   [2] Task ID: c7904c34 - Message ID: 1777177221199-0

   ⏳ Task [639f0385] Scanning...
[12:30:04.659] ✅ Task [639f0385] Completed and ACKed
[12:30:04.659] 🔓 Semaphore released (took 1.316242208s)

[12:30:04.659] 🔍 Task [bac46c3e] Starting scan
   Message ID: 1777177221188-0
   Targets: [127.0.0.1:8888]
   Templates: [local-flask]
   ⏳ Task [bac46c3e] Scanning...
[12:30:04.779] ✅ Task [bac46c3e] Completed and ACKed
[12:30:04.779] 🔓 Semaphore released (took 120.020959ms)

[12:30:04.779] 🔍 Task [c7904c34] Starting scan
   Message ID: 1777177221199-0
   Targets: [www.baidu.com]
   Templates: [multi-request-check]
   ⏳ Task [39f4df6b] Scanning...
   ⏳ Task [b49c1e37] Scanning...
   ⏳ Task [c7904c34] Scanning...
   [VULN] Task [39f4df6b] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[12:30:05.360] ✅ Task [39f4df6b] Completed and ACKed
[12:30:05.360] 🔓 Semaphore released (took 2.017454292s)
   [VULN] Task [b49c1e37] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[12:30:05.454] ✅ Task [b49c1e37] Completed and ACKed
[12:30:05.454] 🔓 Semaphore released (took 2.111749083s)
   [VULN] Task [c7904c34] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[12:30:05.596] ✅ Task [c7904c34] Completed and ACKed
[12:30:05.596] 🔓 Semaphore released (took 817.565458ms)

🛑 Shutting down consumer...
✅ Consumer stopped

很快就完成了,monitor.go的终端输出变成了:

============================================================
[12:30:20] 📊 Stream Status
============================================================
   Stream length: 5 messages
   First entry ID: 1777177221155-0 (oldest, FIFO head)
   Last entry ID: 1777177221199-0 (newest, FIFO tail)

   Consumer Groups:
   - scanners: Pending=0, Consumers=1, Lag=0
============================================================

成功消费完了全部的消息。

简单解析以及一些知识点:

有个问题就是,从代码实例以及简单思考中就可以知道,因为是需要返回ack来表示消费者是否运行完任务,若没返回ack的话就会一直存在于pel中,此时就可能被其他消费者给转移到自己的pel中并下发任务,这样会导致重复下发任务,资源浪费,如何解决呢?至少有两种方式

后续可以优化的点:

两者对比

都可以实现持久化以及先进先出,但是redis stream有两个非常好的点:

同时通过先cover pel => 再取新的消息这个流程,可以很好的实现并发+先下发的任务先运行,可以极大提高使用以及运行效率。

实时分析

这个板块主要关注如何实现扫描结果的同步。涉及到redis的两个使用场景:Redis Pub/Sub和redis stream,下面来分别看看各自的使用场景,最后再看要选择哪个。

Redis发布订阅

这里涉及到Redis的发布/订阅(Pub/Sub)模式,这是一个消息通信模式,用于实现消息的发布和订阅,具体如下:

这个机制可以用于实现数据同步等功能,这里主要就这个点来看看代码是怎么实现的。

Redis Pub/Sub实现的基本代码如下:

package main

import (
	"context"
	"fmt"
	"github.com/redis/go-redis/v9"
	"time"
)

var ctx = context.Background()

func main() {
	// 1. 初始化客户端
	rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		Password: "redis",
		DB:       0,
	})

	channelName := "dast.task.events"

	// ---------------- Subscriber (订阅者) ----------------
	// 启动一个 goroutine 模拟异步监听
	go func() {
		pubsub := rdb.Subscribe(ctx, channelName)
		defer pubsub.Close()

		fmt.Println("[*] 订阅者已就绪,正在监听频道:", channelName)

		// 持续接收消息
		for {
			msg, err := pubsub.ReceiveMessage(ctx)
			if err != nil {
				panic(err)
			}
			fmt.Printf("[Subscriber] 收到新任务详情: %s\n", msg.Payload)
		}
	}()

	// 等待一秒确保订阅者已启动
	time.Sleep(time.Second)

	// ---------------- Publisher (发布者) ----------------
	fmt.Println("[+] 发布者准备发送消息...")

	payload := "START_SCAN: https://example.com"
	err := rdb.Publish(ctx, channelName, payload).Err()
	if err != nil {
		panic(err)
	}

	// 保持主程序运行
	select {}
}

实现效果:

[*] 订阅者已就绪,正在监听频道: dast.task.events
[+] 发布者准备发送消息...
[Subscriber] 收到新任务详情: START_SCAN: https://example.com

代码实现细节如下:

通过保持主程序运行,让其中的goroutine语句持续生效,由此达到持续接收消息的效果。我们可以直接通过redis语句来往对应频道下发消息,看这里能否接收到:

PUBLISH "dast.task.events" "START_SCAN: hahahahaha,just a test"

image-20260425151020062

接收者同步接收到消息:

image-20260425151102133

————————————————————

Redis Stream

基本概念前面说过,大概思路就是新建一个stream,然后在任务下发、扫描结束的时候往stream中追加任务状态的信息,然后消费者(用于监测)就去抓取并更新对应的任务状态,由此实现数据同步。参考如下架构:

┌─────────────┐
│  扫描任务    │ (你的业务代码)
└──────┬──────┘
       │ 1. 发送进度事件
┌─────────────────────────────┐
│     Redis Stream            │ (消息队列)
│   progress:stream           │
└──────┬──────────────────────┘
       │ 2. 多消费者订阅
   ┌───┴────┬─────────┐
   ▼        ▼         ▼
┌────────┐ ┌────────┐ ┌────────┐
│消费者1  │ │监控面板 │ │同步服务│
│(分析)  │ │(展示)  │ │(持久化)│
└────────┘ └────────┘ └───┬────┘
                          │ 3. 写入 Redis Hash
                    ┌──────────────┐
                    │  Redis Hash  │ (快速查询)
                    │task:progress:*│
                    └──────────────┘

架构如上,具体实现就不多说了。

两者对比

优点:是解耦合的,发布者和订阅者之间互不影响,适合实时聊天应用程序、实时新闻推送等场景

缺点:Redis发布订阅机制有个问题是消息无法持久化,如果出现网络断开、redis宕机,消息就会被丢弃。

重点就是持久化,可以防止消息被丢弃,同时可以实现数据同步。

分布式锁

预期实现场景:对下发的任务中设置的相关信息进行处理,已实现分布式的调度。

Redis分布式锁是一种基于redis的高性能、高可用的分布式互斥机制,用于解决分布式系统中多个服务实例对共享资源的并发访问问题。其核心原理是利用redis键的唯一性和原子性操作来实现锁的获取与释放。通过对消息进行加锁,可以保证某个资源在同一时间只能被一个执行者处理。

redis分布式锁其实就是一个特殊用法的键值对,其存在如下结构:

一般需要依靠SET命令的两个关键参数:

具体格式形如:

SET key value NX EX ttl

另外还有一个非常重要的点就是redis分布式锁在实际运用中的实现方式,具体可以看后续代码的分析。

————————————

结合到nuclei+redis stream+分布式锁的代码如下:

package services

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

// ScanTask 扫描任务
type ScanTask struct {
	ID          string
	Targets     []string
	TemplateIDs []string
	CreatedAt   int64
}

// RedisStream Redis Stream 管理器
type RedisStream struct {
	client *redis.Client
	ctx    context.Context
}

// NewRedisStream 创建 Stream 管理器
func NewRedisStream(addr, password string, db int) (*RedisStream, error) {
	client := redis.NewClient(&redis.Options{
		Addr:     addr,
		Password: password,
		DB:       db,
	})

	ctx := context.Background()
	if err := client.Ping(ctx).Err(); err != nil {
		return nil, fmt.Errorf("redis connection failed: %w", err)
	}

	return &RedisStream{client: client, ctx: ctx}, nil
}

func (rs *RedisStream) Close() error {
	return rs.client.Close()
}

// CreateConsumerGroup 创建消费者组
func (rs *RedisStream) CreateConsumerGroup(streamName, groupName string) error {
	// 尝试创建消费者组,如果已存在会返回错误(忽略)
	err := rs.client.XGroupCreateMkStream(rs.ctx, streamName, groupName, "0").Err()
	if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
		return err
	}
	return nil
}

// AddTask 添加任务到 Stream(FIFO:按消息 ID 顺序)
func (rs *RedisStream) AddTask(streamName string, task *ScanTask) (string, error) {
	// 直接将整个任务序列化为 JSON
	taskJSON, err := json.Marshal(task)
	if err != nil {
		return "", fmt.Errorf("marshal task failed: %w", err)
	}

	// XADD: 添加消息到 Stream,Redis 自动生成递增的消息 ID
	// 消息 ID 格式:时间戳(毫秒)-序列号,保证严格递增(FIFO)
	messageID, err := rs.client.XAdd(rs.ctx, &redis.XAddArgs{
		Stream: streamName,
		Values: map[string]interface{}{
			"task": string(taskJSON), // 整个任务作为一个 JSON 字段
		},
	}).Result()

	if err != nil {
		return "", err
	}

	return messageID, nil
}

// ReadTasks 从 Stream 读取任务(消费者组模式,FIFO 顺序)
func (rs *RedisStream) ReadTasks(streamName, groupName, consumerName string, count int64, block time.Duration) ([]redis.XMessage, error) {
	// XREADGROUP: 从消费者组读取消息
	// ">" 表示只读取未被该消费者组消费的新消息
	// Redis 按消息 ID 字典序返回,保证 FIFO(先进先出)
	streams, err := rs.client.XReadGroup(rs.ctx, &redis.XReadGroupArgs{
		Group:    groupName,
		Consumer: consumerName,
		Streams:  []string{streamName, ">"}, // ">" = 读取新消息(未被消费的)
		Count:    count,                     // 批量读取数量
		Block:    block,                     // 阻塞等待时间,用来redis是直接返回空还是让消费者等待一定时间看是否有新的消息进来,0代表无限阻塞直到新消息进来,
	}).Result()

	if err != nil {
		if err == redis.Nil {
			return nil, nil // 没有新消息
		}
		return nil, err
	}

	if len(streams) == 0 || len(streams[0].Messages) == 0 {
		return nil, nil
	}

	// 返回的消息已按消息 ID 排序(FIFO)
	return streams[0].Messages, nil
}

// AckTask 确认任务已处理
func (rs *RedisStream) AckTask(streamName, groupName string, messageIDs ...string) error {
	// XACK: 确认消息已处理
	return rs.client.XAck(rs.ctx, streamName, groupName, messageIDs...).Err()
}

// GetPendingTasks 获取未确认的任务(用于故障恢复)
func (rs *RedisStream) GetPendingTasks(streamName, groupName string) ([]redis.XPendingExt, error) {
	// XPENDING: 查询未确认的消息
	pending, err := rs.client.XPendingExt(rs.ctx, &redis.XPendingExtArgs{
		Stream: streamName,
		Group:  groupName,
		Start:  "-",
		End:    "+",
		Count:  100,
	}).Result()

	if err != nil {
		return nil, err
	}

	return pending, nil
}

// ClaimPendingTask 认领未确认的任务(从其他消费者)
func (rs *RedisStream) ClaimPendingTask(streamName, groupName, consumerName string, minIdleTime time.Duration, messageIDs ...string) ([]redis.XMessage, error) {
	// XCLAIM: 认领其他消费者的未确认消息
	messages, err := rs.client.XClaim(rs.ctx, &redis.XClaimArgs{
		Stream:   streamName,
		Group:    groupName,
		Consumer: consumerName,
		MinIdle:  minIdleTime,
		Messages: messageIDs,
	}).Result()

	if err != nil {
		return nil, err
	}

	return messages, nil
}

// GetStreamLength 获取 Stream 长度
func (rs *RedisStream) GetStreamLength(streamName string) (int64, error) {
	// XLEN: 获取 Stream 中的消息数量
	return rs.client.XLen(rs.ctx, streamName).Result()
}

// GetStreamInfo 获取 Stream 信息
func (rs *RedisStream) GetStreamInfo(streamName string) (*redis.XInfoStream, error) {
	// XINFO STREAM: 获取 Stream 详细信息
	return rs.client.XInfoStream(rs.ctx, streamName).Result()
}

// GetGroupInfo 获取消费者组信息
func (rs *RedisStream) GetGroupInfo(streamName string) ([]redis.XInfoGroup, error) {
	// XINFO GROUPS: 获取消费者组信息
	return rs.client.XInfoGroups(rs.ctx, streamName).Result()
}

// ParseTask 解析消息为任务
func ParseTask(msg redis.XMessage) (*ScanTask, error) {
	// 直接从 JSON 反序列化整个任务
	taskJSON, ok := msg.Values["task"].(string)
	if !ok {
		return nil, fmt.Errorf("task field not found or invalid")
	}

	var task ScanTask
	if err := json.Unmarshal([]byte(taskJSON), &task); err != nil {
		return nil, fmt.Errorf("unmarshal task failed: %w", err)
	}

	return &task, nil
}

// AcquireTaskLock 获取任务分布式锁
func (rs *RedisStream) AcquireTaskLock(messageID string, consumerName string, ttl time.Duration) (bool, error) {
	lockKey := fmt.Sprintf("lock:task:%s", messageID)
	// 使用 SET 命令配合 NX 和 EX 选项实现分布式锁
	// NX: 只在键不存在时设置
	// TTL: 设置过期时间
	result := rs.client.SetArgs(rs.ctx, lockKey, consumerName, redis.SetArgs{
		Mode: "NX", // 只在键不存在时设置
		TTL:  ttl,  // 设置过期时间
	})
	
	if err := result.Err(); err != nil {
		// redis.Nil 表示键已存在(获取锁失败)
		if err == redis.Nil {
			return false, nil
		}
		return false, fmt.Errorf("acquire lock failed: %w", err)
	}
	
	// 检查返回值,"OK" 表示成功获取锁
	val, err := result.Result()
	if err != nil {
		if err == redis.Nil {
			return false, nil
		}
		return false, fmt.Errorf("acquire lock failed: %w", err)
	}
	
	return val == "OK", nil
}

// ReleaseTaskLock 释放任务分布式锁
func (rs *RedisStream) ReleaseTaskLock(messageID string, consumerName string) error {
	lockKey := fmt.Sprintf("lock:task:%s", messageID)
	
	// Lua 脚本确保只有锁的持有者才能释放
	script := `
		if redis.call("get", KEYS[1]) == ARGV[1] then
			return redis.call("del", KEYS[1])
		else
			return 0
		end
	`
	
	_, err := rs.client.Eval(rs.ctx, script, []string{lockKey}, consumerName).Result()
	return err
}

// RenewTaskLock 续期任务锁(用于长时间运行的任务)
func (rs *RedisStream) RenewTaskLock(messageID string, consumerName string, ttl time.Duration) (bool, error) {
	lockKey := fmt.Sprintf("lock:task:%s", messageID)
	
	// Lua 脚本确保只有锁的持有者才能续期
	script := `
		if redis.call("get", KEYS[1]) == ARGV[1] then
			return redis.call("expire", KEYS[1], ARGV[2])
		else
			return 0
		end
	`
	
	result, err := rs.client.Eval(rs.ctx, script, []string{lockKey}, consumerName, int(ttl.Seconds())).Result()
	if err != nil {
		return false, err
	}
	
	return result.(int64) == 1, nil
}

// GetConsumerInfo 获取消费者详细信息
func (rs *RedisStream) GetConsumerInfo(streamName, groupName string) ([]redis.XInfoConsumer, error) {
	// XINFO CONSUMERS: 获取消费者详细信息
	return rs.client.XInfoConsumers(rs.ctx, streamName, groupName).Result()
}
package main

import (
	"fmt"
	"os"
	"time"

	"Go_ENV/services"

	"github.com/google/uuid"
)

func main() {
	stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
		os.Exit(1)
	}
	defer stream.Close()

	streamName := "scan:tasks"
	groupName := "scanners"

	// 创建消费者组
	if err := stream.CreateConsumerGroup(streamName, groupName); err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer group: %v\n", err)
		os.Exit(1)
	}

	fmt.Println("=== Redis Stream Producer (FIFO Demo) ===\n")

	// 创建测试任务
	tasks := []*services.ScanTask{
		{
			ID:          uuid.New().String(),
			Targets:     []string{"127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com", "127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com"},
			TemplateIDs: []string{"local-flask", "multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"127.0.0.1:8888"},
			TemplateIDs: []string{"local-flask"},
			CreatedAt:   time.Now().UnixMilli(),
		},
		{
			ID:          uuid.New().String(),
			Targets:     []string{"www.baidu.com"},
			TemplateIDs: []string{"multi-request-check"},
			CreatedAt:   time.Now().UnixMilli(),
		},
	}

	// 添加任务到 Stream(按顺序添加,验证 FIFO)
	fmt.Printf("📤 Adding %d tasks to stream: %s\n", len(tasks), streamName)
	fmt.Println("   (Tasks will be consumed in FIFO order by Message ID)\n")

	for i, task := range tasks {
		// 添加小延迟,确保消息 ID 不同
		if i > 0 {
			time.Sleep(10 * time.Millisecond)
		}

		messageID, err := stream.AddTask(streamName, task)
		if err != nil {
			fmt.Fprintf(os.Stderr, "Failed to add task: %v\n", err)
			continue
		}
		fmt.Printf("✓ Task %d: ID=%s, Targets=%v\n   → Message ID: %s\n",
			i+1, task.ID[:8], task.Targets, messageID)
	}

	// 显示 Stream 信息
	length, _ := stream.GetStreamLength(streamName)
	fmt.Printf("\n📊 Stream length: %d messages\n", length)

	info, _ := stream.GetStreamInfo(streamName)
	fmt.Printf("📊 First entry ID: %s (oldest, will be consumed first)\n", info.FirstEntry.ID)
	fmt.Printf("📊 Last entry ID: %s (newest, will be consumed last)\n", info.LastEntry.ID)

	groups, _ := stream.GetGroupInfo(streamName)
	for _, group := range groups {
		fmt.Printf("📊 Group: %s, Pending: %d, Consumers: %d\n",
			group.Name, group.Pending, group.Consumers)
	}

	fmt.Println("\n✅ All tasks added in FIFO order!")
	fmt.Println("💡 Run consumer to see tasks consumed in the same order")
}
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"path/filepath"
	"sync"
	"syscall"
	"time"

	"Go_ENV/services"

	nuclei "github.com/projectdiscovery/nuclei/v3/lib"
	"github.com/projectdiscovery/nuclei/v3/pkg/catalog/disk"
	"github.com/projectdiscovery/nuclei/v3/pkg/output"
	"github.com/redis/go-redis/v9"
)

func main() {
	stream, err := services.NewRedisStream("localhost:6379", "redis", 0)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to connect to Redis: %v\n", err)
		os.Exit(1)
	}
	defer stream.Close()

	streamName := "scan:tasks"
	groupName := "scanners"
	consumerName := fmt.Sprintf("consumer-%d", time.Now().Unix()) //消费者名称

	// 锁配置
	const (
		lockTTL       = 6 * time.Minute // 锁的过期时间:6 分钟
		lockRenewTime = 5 * time.Minute // 锁的续期间隔:5 分钟
	)

	fmt.Printf("=== Redis Stream Consumer (FIFO Mode with Distributed Lock) ===\n")
	fmt.Printf("Consumer: %s\n", consumerName)
	fmt.Printf("Max concurrent: 3\n")
	fmt.Printf("Lock TTL: %v (自动续期间隔: %v)\n", lockTTL, lockRenewTime)
	fmt.Printf("💡 Tasks will be consumed in FIFO order with distributed lock protection\n")
	fmt.Printf("💡 Priority: PEL tasks → New tasks\n\n")

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 优雅退出
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-sigChan
		fmt.Println("\n🛑 Shutting down consumer...")
		cancel()
	}()

	// 并发控制:最多 3 个任务同时运行
	semaphore := make(chan struct{}, 3)
	var wg sync.WaitGroup

	// 主循环:持续消费任务
	fmt.Println("🔄 Starting task consumption loop...")
	for {
		select {
		case <-ctx.Done():
			wg.Wait()
			fmt.Println("✅ Consumer stopped")
			return
		default:
			// 等待信号量可用(阻塞点)
			semaphore <- struct{}{}

			// 优先处理 PEL 中的任务
			processed := false
			pending, err := stream.GetPendingTasks(streamName, groupName)
			if err == nil && len(pending) > 0 {
				// 尝试认领并处理第一个超时的任务
				for _, p := range pending {
					// 认领超过 10 秒未确认的任务
					if p.Idle < 10*time.Second {
						continue
					}

					// 先尝试获取锁,如果获取失败说明任务正在被处理
					locked, err := stream.AcquireTaskLock(p.ID, consumerName, lockTTL)
					if err != nil {
						fmt.Printf("[%s] ⚠️  PEL Task [%s] 检查锁失败: %v\n",
							time.Now().Format("15:04:05.000"), p.ID[:8], err)
						continue
					}

					if !locked {
						// 锁已被其他消费者持有,说明任务正在被处理,跳过
						fmt.Printf("[%s] ⏭️  PEL Task [%s] 已被其他消费者锁定,跳过\n",
							time.Now().Format("15:04:05.000"), p.ID[:8])
						continue
					}

					// 成功获取锁,说明任务未被处理,可以认领
					fmt.Printf("[%s] 🔒 PEL Task [%s] 获取锁成功,准备认领\n",
						time.Now().Format("15:04:05.000"), p.ID[:8])

					// 认领任务
					messages, err := stream.ClaimPendingTask(streamName, groupName, consumerName, 10*time.Second, p.ID)
					if err != nil || len(messages) == 0 {
						// 认领失败,释放锁
						stream.ReleaseTaskLock(p.ID, consumerName)
						fmt.Printf("[%s] ❌ PEL Task [%s] 认领失败: %v\n",
							time.Now().Format("15:04:05.000"), p.ID[:8], err)
						continue
					}

					msg := messages[0]
					wg.Add(1)
					// 注意:锁已经获取,在 processTaskWithLock 中不需要再次获取
					go processTaskWithLockAcquired(ctx, stream, streamName, groupName, consumerName, msg, &wg, semaphore, true, lockTTL, lockRenewTime)
					processed = true
					break
				}
			}

			// 如果没有处理 PEL 任务,则拉取新任务
			if !processed {
				messages, err := stream.ReadTasks(streamName, groupName, consumerName, 1, 1*time.Second)
				if err != nil {
					fmt.Fprintf(os.Stderr, "Read error: %v\n", err)
					<-semaphore // 释放信号量
					time.Sleep(100 * time.Millisecond)
					continue
				}

				if messages == nil || len(messages) == 0 {
					<-semaphore // 释放信号量
					continue
				}

				msg := messages[0]
				wg.Add(1)
				go processTaskWithLock(ctx, stream, streamName, groupName, consumerName, msg, &wg, semaphore, false, lockTTL, lockRenewTime)
			}
		}
	}
}

// processTaskWithLock 处理任务(带分布式锁)
func processTaskWithLock(ctx context.Context, stream *services.RedisStream, streamName, groupName, consumerName string,
	msg redis.XMessage, wg *sync.WaitGroup, semaphore chan struct{}, isRecovered bool, lockTTL, lockRenewTime time.Duration) {
	
	startTime := time.Now()
	defer func() {
		duration := time.Since(startTime)
		<-semaphore
		fmt.Printf("[%s] 🔓 Semaphore released (took %v)\n",
			time.Now().Format("15:04:05.000"), duration)
		wg.Done()
	}()

	task, err := services.ParseTask(msg)
	if err != nil {
		fmt.Printf("❌ Parse error: %v\n", err)
		return
	}

	// 尝试获取分布式锁
	locked, err := stream.AcquireTaskLock(msg.ID, consumerName, lockTTL)
	if err != nil {
		fmt.Printf("[%s] ❌ Task [%s] Failed to acquire lock: %v\n",
			time.Now().Format("15:04:05.000"), task.ID[:8], err)
		return
	}

	if !locked {
		// 锁已被其他消费者持有,跳过此任务
		fmt.Printf("[%s] ⏭️  Task [%s] Already locked by another consumer, skipping\n",
			time.Now().Format("15:04:05.000"), task.ID[:8])
		return
	}

	// 成功获取锁,确保最后释放
	defer func() {
		if err := stream.ReleaseTaskLock(msg.ID, consumerName); err != nil {
			fmt.Printf("[%s] ⚠️  Task [%s] Failed to release lock: %v\n",
				time.Now().Format("15:04:05.000"), task.ID[:8], err)
		}
	}()

	processTaskCore(ctx, stream, streamName, groupName, consumerName, msg, task, lockTTL, lockRenewTime, isRecovered)
}

// processTaskWithLockAcquired 处理任务(锁已获取)
func processTaskWithLockAcquired(ctx context.Context, stream *services.RedisStream, streamName, groupName, consumerName string,
	msg redis.XMessage, wg *sync.WaitGroup, semaphore chan struct{}, isRecovered bool, lockTTL, lockRenewTime time.Duration) {
	
	startTime := time.Now()
	defer func() {
		duration := time.Since(startTime)
		<-semaphore
		fmt.Printf("[%s] 🔓 Semaphore released (took %v)\n",
			time.Now().Format("15:04:05.000"), duration)
		wg.Done()
	}()

	task, err := services.ParseTask(msg)
	if err != nil {
		fmt.Printf("❌ Parse error: %v\n", err)
		// 解析失败,释放锁
		stream.ReleaseTaskLock(msg.ID, consumerName)
		return
	}

	// 锁已经在外部获取,确保最后释放
	defer func() {
		if err := stream.ReleaseTaskLock(msg.ID, consumerName); err != nil {
			fmt.Printf("[%s] ⚠️  Task [%s] Failed to release lock: %v\n",
				time.Now().Format("15:04:05.000"), task.ID[:8], err)
		}
	}()

	processTaskCore(ctx, stream, streamName, groupName, consumerName, msg, task, lockTTL, lockRenewTime, isRecovered)
}

// processTaskCore 任务处理核心逻辑
func processTaskCore(ctx context.Context, stream *services.RedisStream, streamName, groupName, consumerName string,
	msg redis.XMessage, task *services.ScanTask, lockTTL, lockRenewTime time.Duration, isRecovered bool) {
	
	recoveredTag := ""
	if isRecovered {
		recoveredTag = " [RECOVERED]"
	}

	fmt.Printf("\n[%s] 🔒 Task [%s]%s Lock acquired, starting scan\n",
		time.Now().Format("15:04:05.000"), task.ID[:8], recoveredTag)
	fmt.Printf("   Message ID: %s\n", msg.ID)
	fmt.Printf("   Targets: %v\n", task.Targets)
	fmt.Printf("   Templates: %v\n", task.TemplateIDs)
	fmt.Printf("   Lock TTL: %v (续期间隔: %v)\n", lockTTL, lockRenewTime)

	// 启动锁续期 goroutine
	renewCtx, cancelRenew := context.WithCancel(ctx)
	defer cancelRenew()
	
	go func() {
		ticker := time.NewTicker(lockRenewTime)
		defer ticker.Stop()
		
		renewCount := 0
		for {
			select {
			case <-renewCtx.Done():
				if renewCount > 0 {
					fmt.Printf("[%s] 🛑 Task [%s] 停止锁续期(共续期 %d 次)\n",
						time.Now().Format("15:04:05.000"), task.ID[:8], renewCount)
				}
				return
			case <-ticker.C:
				renewed, err := stream.RenewTaskLock(msg.ID, consumerName, lockTTL)
				if err != nil || !renewed {
					fmt.Printf("[%s] ⚠️  Task [%s] 锁续期失败: %v\n",
						time.Now().Format("15:04:05.000"), task.ID[:8], err)
				} else {
					renewCount++
					fmt.Printf("[%s] 🔄 Task [%s] 锁续期成功(第 %d 次,TTL 重置为 %v)\n",
						time.Now().Format("15:04:05.000"), task.ID[:8], renewCount, lockTTL)
				}
			}
		}
	}()

	// 执行 Nuclei 扫描
	if err := runNucleiScan(ctx, task); err != nil {
		fmt.Printf("[%s] ❌ Task [%s] Scan failed: %v (will retry)\n",
			time.Now().Format("15:04:05.000"), task.ID[:8], err)
		// 不发送 ACK,任务会被重新分配
		return
	}

	// 任务成功,发送 ACK
	if err := stream.AckTask(streamName, groupName, msg.ID); err != nil {
		fmt.Printf("❌ ACK error: %v\n", err)
		return
	}

	fmt.Printf("[%s] ✅ Task [%s] Completed and ACKed\n",
		time.Now().Format("15:04:05.000"), task.ID[:8])
}

// recoverPendingTasks 恢复未确认的任务(故障恢复)- 已废弃,逻辑合并到主循环
// func recoverPendingTasks(stream *services.RedisStream, streamName, groupName, consumerName string, wg *sync.WaitGroup, semaphore chan struct{}) {
// 	// 此函数已废弃,PEL 任务处理已合并到主循环中
// }

// runNucleiScan 执行 Nuclei 扫描
func runNucleiScan(ctx context.Context, task *services.ScanTask) error {
	if len(task.Targets) == 0 {
		return fmt.Errorf("targets is empty")
	}

	// 获取模板目录
	home, err := os.UserHomeDir()
	if err != nil {
		return fmt.Errorf("get home dir failed: %w", err)
	}
	templateDir := filepath.Join(home, "nuclei-templates")

	// 配置 Nuclei 选项
	opts := []nuclei.NucleiSDKOptions{
		nuclei.WithCatalog(disk.NewCatalog(templateDir)),
		nuclei.DisableUpdateCheck(),
	}

	// 如果指定了模板 ID,添加过滤器
	if len(task.TemplateIDs) > 0 {
		opts = append(opts, nuclei.WithTemplateFilters(nuclei.TemplateFilters{
			IDs: task.TemplateIDs,
		}))
	}

	// 创建 Nuclei 引擎
	engine, err := nuclei.NewNucleiEngineCtx(ctx, opts...)
	if err != nil {
		return fmt.Errorf("create nuclei engine failed: %w", err)
	}
	defer engine.Close()

	// 加载模板
	if err := engine.LoadAllTemplates(); err != nil {
		return fmt.Errorf("load templates failed: %w", err)
	}

	// 加载目标
	engine.LoadTargets(task.Targets, true)

	// 执行扫描并处理结果
	fmt.Printf("   ⏳ Task [%s] Scanning...\n", task.ID[:8])
	return engine.ExecuteCallbackWithCtx(ctx, func(ev *output.ResultEvent) {
		if ev == nil || !ev.MatcherStatus {
			return
		}

		severity := ev.Info.SeverityHolder.Severity.String()
		if severity == "" {
			severity = "unknown"
		}

		fmt.Printf("   [VULN] Task [%s] severity=%s template=%s name=%s matched=%s\n",
			task.ID[:8],
			severity,
			ev.TemplateID,
			ev.Info.Name,
			ev.Matched,
		)
	})
}

随后先运行producer.go文件:

=== Redis Stream Producer (FIFO Demo) ===

📤 Adding 5 tasks to stream: scan:tasks
   (Tasks will be consumed in FIFO order by Message ID)

✓ Task 1: ID=ba37e680, Targets=[127.0.0.1:8888]
   → Message ID: 1777369149831-0
✓ Task 2: ID=9df0c69e, Targets=[www.baidu.com 127.0.0.1:8888]
   → Message ID: 1777369149845-0
✓ Task 3: ID=8858fa11, Targets=[www.baidu.com]
   → Message ID: 1777369149857-0
✓ Task 4: ID=9ce2703a, Targets=[127.0.0.1:8888]
   → Message ID: 1777369149869-0
✓ Task 5: ID=1513defc, Targets=[www.baidu.com]
   → Message ID: 1777369149880-0

📊 Stream length: 5 messages
📊 First entry ID: 1777369149831-0 (oldest, will be consumed first)
📊 Last entry ID: 1777369149880-0 (newest, will be consumed last)
📊 Group: scanners, Pending: 0, Consumers: 0

✅ All tasks added in FIFO order!
💡 Run consumer to see tasks consumed in the same order

随后运行consumer.go文件即可:

=== Redis Stream Consumer (FIFO Mode with Distributed Lock) ===
Consumer: consumer-1777369488
Max concurrent: 3
Lock TTL: 6m0s (自动续期间隔: 5m0s)
💡 Tasks will be consumed in FIFO order with distributed lock protection
💡 Priority: PEL tasks → New tasks

🔄 Starting task consumption loop...

[17:44:48.721] 🔒 Task [3c4cf862] Lock acquired, starting scan
   Message ID: 1777369458950-0
   Targets: [www.baidu.com]
   Templates: [local-flask multi-request-check]
   Lock TTL: 6m0s (续期间隔: 5m0s)

[17:44:48.722] 🔒 Task [8ad74325] Lock acquired, starting scan
   Message ID: 1777369458938-0
   Targets: [www.baidu.com 127.0.0.1:8888]
   Templates: [local-flask multi-request-check]
   Lock TTL: 6m0s (续期间隔: 5m0s)

[17:44:48.722] 🔒 Task [07ff4871] Lock acquired, starting scan
   Message ID: 1777369458927-0
   Targets: [127.0.0.1:8888]
   Templates: [local-flask multi-request-check]
   Lock TTL: 6m0s (续期间隔: 5m0s)
   ⏳ Task [07ff4871] Scanning...
   [VULN] Task [07ff4871] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
[17:44:49.931] ✅ Task [07ff4871] Completed and ACKed
[17:44:49.934] 🔓 Semaphore released (took 1.21409825s)

[17:44:49.934] 🔒 Task [e02178f0] Lock acquired, starting scan
   Message ID: 1777369458961-0
   Targets: [127.0.0.1:8888]
   Templates: [local-flask]
   Lock TTL: 6m0s (续期间隔: 5m0s)
   ⏳ Task [e02178f0] Scanning...
   [VULN] Task [e02178f0] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
[17:44:50.054] ✅ Task [e02178f0] Completed and ACKed
[17:44:50.054] 🔓 Semaphore released (took 119.452541ms)

[17:44:50.054] 🔒 Task [f89f35ca] Lock acquired, starting scan
   Message ID: 1777369458971-0
   Targets: [www.baidu.com]
   Templates: [multi-request-check]
   Lock TTL: 6m0s (续期间隔: 5m0s)
   ⏳ Task [8ad74325] Scanning...
   [VULN] Task [8ad74325] severity=high template=local-flask name=local_flask_vul_test matched=http://127.0.0.1:8888
   ⏳ Task [f89f35ca] Scanning...
   [VULN] Task [8ad74325] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[17:44:50.633] ✅ Task [8ad74325] Completed and ACKed
[17:44:50.633] 🔓 Semaphore released (took 1.913249584s)
   [VULN] Task [f89f35ca] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[17:44:50.871] ✅ Task [f89f35ca] Completed and ACKed
[17:44:50.871] 🔓 Semaphore released (took 817.130709ms)
   ⏳ Task [3c4cf862] Scanning...
   [VULN] Task [3c4cf862] severity=low template=multi-request-check name=just_a_baidu_test matched=https://www.baidu.com/robots.txt
[17:44:51.607] ✅ Task [3c4cf862] Completed and ACKed
[17:44:51.607] 🔓 Semaphore released (took 2.887172375s)

代码实现细则以及部分知识点:

image-20260428180343305

redis的令牌桶算法

令牌桶算法(Token Bucket Algorithm),个人实现的dast中非常重要的一部分,用来做qps限制,以代码形式表示令牌桶的结果大概如下:

type TokenBucket struct {
    capacity       int64     // 桶最大容量
    tokens         float64   // 当前令牌数量
    rate           float64   // 每秒生成令牌速度
    lastRefillTime int64     // 上次补充时间
}

这四个变量就可以用于描述整个限流状态。此算法实现的大致原理是:每次请求到达时,都会根据当前时间-上次补充时间(lastRefillTime),结合rate来进行计算需要更新的令牌并补充(但不会超过capacity),然后如果桶中的令牌足够,就会允许请求进行下去,否则就拒绝/让请求等待token补充,流程大致如下:

sequenceDiagram
    participant Client as 请求
    participant Bucket as TokenBucket

    Client->>Bucket: Allow()

    Bucket->>Bucket: 获取当前时间 now

    Bucket->>Bucket: now - lastRefillTime

    Bucket->>Bucket: 计算新增 token

    Bucket->>Bucket: 更新 tokens

    Bucket->>Bucket: 判断 token 是否足够

    alt token 足够
        Bucket->>Bucket: 扣减 token
        Bucket-->>Client: 放行
    else token 不够
        Bucket-->>Client: 拒绝
    end

    Bucket->>Bucket: 更新 lastRefillTime

注意这里并不是定时器那种定时往桶里面加令牌,这样设置可以让整体的token添加更合理,如果是定时器那种,长时间未运行任务,计时器就会一直尝试加令牌,即使令牌桶已经满了,消耗服务器资源,不如通过令牌桶算法来在每次的实际请求中按需推导状态并按需添加令牌。具体计算方式是通过lua脚本实现的,这里就不多说了。

————————

基本概念如上,在dast的架构构建中,考虑下来,还是要不同模块设置不同的qps,因为如果只对一个策略做整体的qps限制,由于每个模块的扫描方式不同,无法做到兼顾,还是每个模块做各自的qps限制。那么在实际设置中,需要考虑到如下问题:

在后续了解中,发现可以直接通过golang.org/x/time/rate这个库来解决,直接在代码中针对单ip进行速率限制即可,redis相关的令牌桶算法更适合全局下的限制,和我dast架构设计不适很符合,这里暂时不涉及。

缓存?

后续看需不需要详细看看