Tekton Interceptor二次开发拓展

503 阅读5分钟

1.Tekton Interceptor

Tekton Triggers 是一个 Tekton 组件,可让您从各种来源的事件中检测和提取信息,并基于该信息确定性地实例化并执行 TaskRunsPipelineRuns。 Tekton Triggers 还可以将从事件中提取的信息直接传递给 TaskRunsPipelineRuns

Interceptor- 针对特定平台的事件处理器 在之前运行 TriggerBinding使您能够执行负载过滤、验证(使用Secret)、转换、定义和测试触发条件等 有用的处理。 一旦事件数据通过拦截器,它就会进入 Trigger在将有效负载数据传递给之前 TriggerBinding

我们可以看一下TektonTrigger组件:

图片.png

一个 Interceptor是在特定平台之前运行的事件处理器 TriggerBinding.它允许您执行负载过滤、验证(使用秘密)、 转换、定义和测试触发条件,以及实施其他有用的处理。 一旦事件数据通过 Interceptor,然后它转到 Trigger之前会将有效载荷数据到 TriggerBinding. 您还可以使用 Interceptor修改关联的行为 Trigger.

2.自定义Interceptor

Tekton Triggers 附带 ClusterInterceptor自定义资源定义 (CRD),它允许您实现自定义集群范围的 Webhook 形式 Interceptor.

A ClusterInterceptor指定运行自定义业务逻辑的外部 Kubernetes v1 服务,该逻辑从 EventListener通过 HTTP 请求并返回经过处理的负载版本以及 HTTP 200 响应。

为您运行自定义业务逻辑的 Kubernetes 对象 ClusterInterceptor必须满足以下条件:

  • 由监听 HTTP 端口(默认端口为 80)的常规 Kubernetes v1 服务
  • 接受 HTTP POST包含一个请求 InterceptorRequest 作为 JSON 正文
  • 返回一个 HTTP 200 OK 响应,其中包含 InterceptorResponse作为 JSON 正文。 如果触发器处理应该继续,拦截器应该设置 continue响应中的字段为 true. 如果应该停止处理,拦截器应该设置 continue字段为 false并提供详细说明错误的其他信息到status
  • 仅当负载处理因灾难性故障而停止时,才返回 HTTP 200 OK 以外的响应。

2.1 Interceptor

我们可以根据官方示例拓展一个自定义的Interceptor官方示例

Version Interceptor:用来判断Tekton触发器的构建的任务阶段和版本是否符合预期。

github.com/tektoncd/triggers/pkg/interceptors包中,官方提供了对应的Interceptor接口

type InterceptorInterface interface {
	// Process executes the given InterceptorRequest. Simply getting a non-nil InterceptorResponse back is not sufficient
	// to determine if the interceptor processing was successful. Instead use the InterceptorResponse.Status.Continue to
	// see if processing should continue and InterceptorResponse.Status.Code to distinguish between the kinds of errors
	// (i.e user errors vs system errors)
	Process(ctx context.Context, r *InterceptorRequest) *InterceptorResponse
}

我们对应实现该接口即可

// 自定义version interceptor
type VersionInterceptor struct {
	stage string
}

// Process execute interceptor
func (v *VersionInterceptor) Process(ctx context.Context, r *triggersv1.InterceptorRequest) *triggersv1.InterceptorResponse {
}

InterceptorRequest是官方提供的拦截器请求对象,由tekton传入。我们先来看看该对象详细

type InterceptorRequest struct {
	Body string `json:"body,omitempty"`
	Header map[string][]string `json:"header,omitempty"`
	Extensions map[string]interface{} `json:"extensions,omitempty"`
	InterceptorParams map[string]interface{} `json:"interceptor_params,omitempty"`
	Context *TriggerContext `json:"context"`
}

其中:

  • Body:对应使用HTTP触发的请求体内容
  • Header:对应HTTP触发的请求头内容
  • Extensions:对应了在Tekton中上一个拦截器拓展的字段
  • InterceptorParams:对应了在Tekton的eventListener配置拦截器时,传入的字段。

InterceptorResponse是官方提供的拦截器请求对象,返回给Tekton。

type InterceptorResponse struct {
	Extensions map[string]interface{} `json:"extensions,omitempty"`
	Continue bool `json:"continue"` // Don't add omitempty -- it  will remove the continue field when the 
	Status Status `json:"status"`
}
  • Extensions:可以设置相关的拓展字段
  • Continue: 是否继续执行后续触发器流程
  • Status:设置错误状态

2.2 开始实操

1.定义Version Interceptor

type VersionInterceptor struct {
	stage string
}

// Process execute interceptor
func (v *VersionInterceptor) Process(ctx context.Context, r *triggersv1.InterceptorRequest) *triggersv1.InterceptorResponse {
}

2.设置HTTP Server,并对Tekton传入的数据进行反序列化

	// health check
	http.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("pong"))
	})

	// execute interceptor handleFunc
	http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
		
        // 读取请求体中的数据
		var body []byte
		if all, err := io.ReadAll(r.Body); err != nil {
			fmt.Printf("err: %+v\n", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		} else {
			body = all
		}
		
        // 反序列化InterceptorRequest
		ireq := &triggersv1.InterceptorRequest{}
		if err := json.Unmarshal(body, ireq); err != nil {
			fmt.Printf("err: %+v\n", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		
		vinter := VersionInterceptor{
			stage: *stg,
		}
        
        // 调用实际的Interceptor业务
		iresp := vinter.Process(r.Context(), ireq)
		
        // 序列化InterceptorResponse对象进行返回
		if respBytes, err := json.Marshal(iresp); err != nil {
			fmt.Printf("err: %+v\n", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		} else {
			w.Write(respBytes)
		}

	})
	server := http.Server{
		Addr: ":" + (*port),
	}
	println("version interceptor run on: ", *port)
	if err := server.ListenAndServe(); err != nil {
		panic(err)
	}

3.开始编写实际的拦截器逻辑

由于我们需要获取Tekton的InterceptorParam,故我们需要声明一个mock结构体用于反序列化

mockInterceptorParam:

// 用于反序列Tekton中InterceptorParam
type mockInterceptorParam struct {
	Stage   string `json:"stage"`
	Version string `json:"version"`
}

el.yaml:

spec:
  serviceAccountName: tekton-robot
  triggers:
    - name: first-trigger
      interceptors:
        - ref:
            name: version
          # 以上mockInterceptorParam对应如下字段设置
          params:
            - name: "stage"
              value: "stable"
            - name: "version"
              value:  "v1.0.1"	              

Process:

// Process execute interceptor
func (v *VersionInterceptor) Process(ctx context.Context, r *triggersv1.InterceptorRequest) *triggersv1.InterceptorResponse {
	p := &mockInterceptorParam{}

	// interceptor param 解析错误
        // 使用tekton interceptor包中的反序列化param方法进行序列化
	if err := interceptors.UnmarshalParams(r.InterceptorParams, p); err != nil {
        // 返回错误
		return interceptors.Failf(codes.InvalidArgument, "failed to parse interceptor params: %v", err)
	}

	// stage 不相等
	if v.stage != p.Stage {
		return interceptors.Failf(codes.Canceled, "failed to execute trigger: %v", stageErr)
	}
	// version 非法
	if !semver.IsValid(p.Version) {
		return interceptors.Failf(codes.Canceled, "failed to execute trigger: %v", stageErr)
	}
	
    // 拓展字段
	r.Extensions["version_check"] = "SUCCESS"

	return &triggersv1.InterceptorResponse{
		Extensions: r.Extensions,
		Continue:   true,
	}
}

interceptors.Failf函数实际是对返回InterceptorResponse错误进行一次封装

// 实际调用了Fail函数,
func Fail(c codes.Code, msg string) *triggersv1beta1.InterceptorResponse {
	return &triggersv1beta1.InterceptorResponse{
		Continue: false,
		Status: triggersv1beta1.Status{
			Code:    c,
			Message: msg,
		},
	}
}

至此,我们完成了一个demo级别Tekton Trigger Interceptor拓展。

2.3 Full Code

我们来看一下完整的代码

package main

import (
	"context"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	triggersv1 "github.com/tektoncd/triggers/pkg/apis/triggers/v1beta1"
	"github.com/tektoncd/triggers/pkg/interceptors"
	"golang.org/x/mod/semver"
	"google.golang.org/grpc/codes"
	"io"
	"net/http"
)

// implement triggersv1.InterceptorInterface
var _ triggersv1.InterceptorInterface = (*VersionInterceptor)(nil)

var (
	stg        = flag.String("stage", "stable", "spec stage")
	port       = flag.String("port", "41234", "spec port")
	stageErr   = errors.New("request param stage no match")
	versionErr = errors.New("request param version illegal")
)

func main() {
	flag.Parse()

	// health check
	http.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("pong"))
	})

	// execute interceptor
	http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {

		var body []byte
		if all, err := io.ReadAll(r.Body); err != nil {
			fmt.Printf("err: %+v\n", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		} else {
			body = all
		}

		ireq := &triggersv1.InterceptorRequest{}
		if err := json.Unmarshal(body, ireq); err != nil {
			fmt.Printf("err: %+v\n", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		vinter := VersionInterceptor{
			stage: *stg,
		}
		iresp := vinter.Process(r.Context(), ireq)

		if respBytes, err := json.Marshal(iresp); err != nil {
			fmt.Printf("err: %+v\n", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		} else {
			w.Write(respBytes)
		}

	})
	server := http.Server{
		Addr: ":" + (*port),
	}
	println("version interceptor run on: ", *port)
	if err := server.ListenAndServe(); err != nil {
		panic(err)
	}
}

type VersionInterceptor struct {
	stage string
}

type mockInterceptorParam struct {
	Stage   string `json:"stage"`
	Version string `json:"version"`
}

// Process execute interceptor
func (v *VersionInterceptor) Process(ctx context.Context, r *triggersv1.InterceptorRequest) *triggersv1.InterceptorResponse {
	p := &mockInterceptorParam{}

	// interceptor param 解析错误
	if err := interceptors.UnmarshalParams(r.InterceptorParams, p); err != nil {
		return interceptors.Failf(codes.InvalidArgument, "failed to parse interceptor params: %v", err)
	}

	// stage 不相等
	if v.stage != p.Stage {
		return interceptors.Failf(codes.Canceled, "failed to execute trigger: %v", stageErr)
	}
	// version 非法
	if !semver.IsValid(p.Version) {
		return interceptors.Failf(codes.Canceled, "failed to execute trigger: %v", stageErr)
	}

	r.Extensions["version_check"] = "SUCCESS"

	return &triggersv1.InterceptorResponse{
		Extensions: r.Extensions,
		Continue:   true,
	}
}

然后我们再进行对该拦截器进行配置到Tekton中

1.第一种直接通过url配置

apiVersion: triggers.tekton.dev/v1alpha1
kind: ClusterInterceptor
metadata:
  name: version
spec:
  clientConfig:
    url: "http://127.0.0.1:41234/version"

2.第二种可以配置通过k8s的Service进行访问

apiVersion: triggers.tekton.dev/v1alpha1
kind: ClusterInterceptor
metadata:
  name: version
spec:
  clientConfig:
    service:
      name: "my-interceptor-svc"
      namespace: "default"
      path: "/optional-path" # optional
      port: 8081 # defaults to 80