asynq:一个由 Go 开发的轻量级的异步定时任务系统

6,640 阅读4分钟

最近开发了一个Go的简单高效的异步任务处理库:Asyqn


安装

要安装asynq库和asynqmon命令行工具,请运行以下命令:

go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon

入门

在本asynq教程中,我们将创建两个程序。

producer.go将创建并定时要由consumer异步处理的任务。

consumer.go 将处理producer创建的任务。

假定在上运行Redis服务器localhost:6379。在开始之前,请确保已安装并运行Redis。

我们需要做的第一件事是创建两个主文件:

mkdir producer consumer
touch producer/producer.go consumer/consumer.go

导入asynq两个文件:

import "github.com/hibiken/asynq"

Asynq使用Redis作为消息代理。使用一种RedisConnOpt类型来指定如何连接到Redis。我们这里将使用RedisClientOpt

// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Omit if no password is required
    Password: "mypassword",
    // Use a dedicated db number for asynq.
    // By default, Redis offers 16 databases (0..15)
    DB: 0,
}

producer.go,我们将创建一个Client实例来创建和定时任务。 在asynq,要执行的工作单元被封装在称为的结构中Task。其中有两个字段:TypePayload

// Task represents a task to be performed.
type Task struct {
    // Type indicates the type of task to be performed.
    Type string

    // Payload holds data needed to perform the task.
    Payload Payload
}

要创建任务,请使用NewTask函数,并为任务传递类型和有效负载。

可以通过Client.Schedule传入任务和需要处理的时间来计划任务。

// producer.go
func main() {
    client := asynq.NewClient(redis)

    // Create a task with typename and payload.
    t1 := asynq.NewTask(
        "send_welcome_email",
        map[string]interface{}{"user_id": 42})

    t2 := asynq.NewTask(
        "send_reminder_email",
        map[string]interface{}{"user_id": 42})

    // Process the task immediately.
    err := client.Schedule(t1, time.Now())
    if err != nil {
        log.Fatal(err)
    }

    // Process the task 24 hours later.
    err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
    if err != nil {
        log.Fatal(err)
    }
}

consumer.go,创建一个Background例来处理任务。

NewBackground函数需要RedisConnOpt和Config

您可以查看有关文档,Config以查看可用的选项。

在此示例中,我们仅指定并发。

// consumer.go
func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    bg.Run(handler)
}

参数t(*asynq.Background).Runasynq.Handler具有一种方法的接口ProcessTask

// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
    ProcessTask(*Task) error
}

实现处理程序的最简单方法是定义一个具有相同type的函数,并asynq.HandlerFunc在将其传递给时使用适配器类型Run

func handler(t *asynq.Task) error {
    switch t.Type {
    case "send_welcome_email":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Welcome Email to User %d\n", id)

    case "send_reminder_email":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Reminder Email to User %d\n", id)

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type)
    }
    return nil
}

func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    // Use asynq.HandlerFunc adapter for a handler function
    bg.Run(asynq.HandlerFunc(handler))
}

我们可以继续向该处理函数添加案例,但是在实际应用中,在单独的函数中为每种案例定义逻辑很方便。为了重构我们的代码,让我们创建一个简单的调度程序,将任务类型映射到其处理程序:

// consumer.go

// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
    mapping map[string]asynq.HandlerFunc
}

// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
    d.mapping[taskType] = fn
}

// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
    fn, ok := d.mapping[task.Type]
    if !ok {
        return fmt.Errorf("no handler registered for %q", task.Type)
    }
    return fn(task)
}

func main() {
    d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
    d.HandleFunc("send_welcome_email", sendWelcomeEmail)
    d.HandleFunc("send_reminder_email", sendReminderEmail)

    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })
    bg.Run(d)
}

func sendWelcomeEmail(t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}

func sendReminderEmail(t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}

现在我们既有任务生产者又有消费者,我们可以运行这两个程序。

go run producer.go

这将创建两项任务:一项应立即处理,另一项将在24小时后处理。

让我们使用asynqmon工具检查任务。

asynqmon stats

你应该能看到,有一个任务Enqueued状态,另一个在Scheduled状态。

注意:如需了解每种状态的含义,请参阅Wiki页面上Life of Task

让我们运行asynqmonwatch命令,以便我们能够连续运行的命令看到的变化。

watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds

最后,让我们启动consumer程序来处理定时的任务。

go run consumer.go

注意:在您发送信号终止程序之前,此操作不会退出。有关如何安全终止后台处理的最佳实践,请参见Signal Wiki页面

您应该能够看到在终端上打印的文本,表明该任务已成功处理。

这是一次asynq基础的快速教程。要了解有关其所有功能(如**优先级队列自定义重试**)的更多信息,请参见的Wiki页面

命令行工具

Asynq附带了一个命令行工具来检查队列和任务的状态。

要安装,请运行以下命令:

go get github.com/hibiken/asynq/tools/asynqmon

完成!

例图:

asynqmon_stats.gif

详情请参考:Asyqn-https://github.com/hibiken/asynq