Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

github.com/JasonkayZK/async

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/JasonkayZK/async

  • v0.0.0-20200711133823-346a8a215186
  • Source
  • Go
  • Socket score

Version published
Created
Source

使用装饰器模式给你的异步回调设置超时重试

背景

在许多场景下,都需要使用异步回调;

例如:在调用批量发送邮件接口后,邮件往往不是马上发送成功,而是需要等待一段时间才能发送成功;而此时,为了节省调用方的效率,服务方可能会返回一个类似于LogId的记录,此后可以使用此LogId来查询邮件的发送状态;

但是,状态的结束时间是未知的,所以我们需要定时去重新请求查询记录接口;此时我们的代码可能是类似于下面的:

func XXXService() {
    ...
    go XXXCallback()
    ...
}

func XXXCallback() {
    for retry := 5; retry > 0; retry-- {
        // Do something
        ...
        if notFullfill {
            time.Sleep(5 * time.Second)
            continue
        }
    }
}

但是如果回调多起来,每个回调函数都要写一段重试代码,而非专注于处理回调这个业务逻辑本身,此时可以使用装饰器模式给你的Callback做一层包装;

反射创建装饰器

在Python中存在装饰器,在Java的Spring框架中更是大量使用AOP编程;那么在Go这种静态语言中怎么实现类似于AOP的功能呢?

答案很简单:反射!

通过反射我们可以获取运行时回调函数的参数,返回值等函数签名信息;进而构造出一个包括了回调函数,但是包括失败重试的函数(有点类似于Java中创建一个代理对象);

得益于在Go中函数是一等公民,我们可以很轻松的实现上面所述;

下面的decorateCallbackWithAttempt函数就实现了将一个函数装饰为一个含有重试机制的新函数:

func decorateCallbackWithAttempt(decoPtr, fn interface{}, retryInterval int64, attempts int64) (err error) {
	defer func() {
		if err1 := recover(); err1 != nil {
			err = fmt.Errorf("decorator err: %v", err1)
		}
	}()

	var decoratedFunc, targetFunc reflect.Value

	decoratedFunc = reflect.ValueOf(decoPtr).Elem()
	targetFunc = reflect.ValueOf(fn)

	v := reflect.MakeFunc(targetFunc.Type(),
		func(in []reflect.Value) (out []reflect.Value) {
			for retry := attempts; retry > 0; retry-- {
				hasErr := false
				// Call callback func
				out = targetFunc.Call(in)

				// Has return val
				if valuesNum := len(out); valuesNum > 0 {
					resultItems := make([]interface{}, valuesNum)
					// Check value

					for k, val := range out {
						resultItems[k] = val.Interface()
						// Has error
						if _, ok := resultItems[k].(error); ok {
							hasErr = true
							break
						}
					}

					// Has err, retry
					if hasErr {
						time.Sleep(time.Duration(retryInterval) * time.Second)
						fmt.Printf("retry %d\n", retry)
						continue
					}
					return
				}
			}
			return out
		})

	decoratedFunc.Set(v)
	return
}

decorateCallbackWithAttempt函数接收四个参数:

  • decoPtr:装饰函数返回值(待装饰函数的指针类型);
  • fn:待装饰函数;
  • retryInterval:错误重新尝试时间间隔;
  • attempts:错误重新尝试次数;

可以看到,通过reflect.MakeFunc,我们可以很轻松的构建一个装饰器函数;

下面测试是调用上面的decorateCallbackWithAttempt函数创建一个装饰器函数并调用:

func errorCallbackTest(str string) (res string, err error) {
	defer func() {
		if err1 := recover(); err1 != nil {
			err = fmt.Errorf("callback err: %v", err1)
		}
	}()

	if str == "error" {
		return "", fmt.Errorf("mock err: %s", str)
	}
	if str == "panic" {
		panic(fmt.Errorf("panic: %s", str))
	}

	fmt.Printf("Test msg: %s\n", str)
	return fmt.Sprintf("Test msg: %s", str), nil
}

func Test_decorateCallbackWithAttempt(t *testing.T) {
	decoratedCallback := errorCallbackTest
	err := decorateCallbackWithAttempt(&decoratedCallback, errorCallbackTest, 1, 3)

	str, err := decoratedCallback("good")
	if err != nil {
		panic(err)
	}
	fmt.Println(str)

	str, err = decoratedCallback("error")
	if err != nil {
		panic(err)
	}
	fmt.Println(str)
}

采用传入指针构造而非通过返回值返回构造的装饰器函数是因为:

由于使用了反射,所以返回的装饰器函数一定是interface{}类型的,还需要做强制类型转换,类似于下面的例子,有些不优雅:

decoratedCallback := decorateCallbackWithAttempt(errorCallbackTest, 1, 3)
f, _ := (*decoratedCallback).(func(string) (string, error))
f()

上面的测试最终函数输出:

=== RUN   Test_decorateCallbackWithAttempt
Test msg: good
retry 3
retry 2
retry 1
mock err: error
--- PASS: Test_decorateCallbackWithAttempt (3.00s)
PASS

可见在error情况下的确进行了重试;

但是上面的代码是同步调用的,即:必须等待错误尝试完成后,函数才会真正返回!

创建新的协程调用

我们可以简单的使用go命令来做到创建一个独立的协程来完成回调:

func Test_decorateCallbackWithAttempt(t *testing.T) {
	decoratedCallback := errorCallbackTest
	err := decorateCallbackWithAttempt(&decoratedCallback, errorCallbackTest, 1, 3)
	if err != nil {
		panic(err)
	}
	go decoratedCallback("good")
	go decoratedCallback("error")

	time.Sleep(time.Second * 5)
}

此时,两个callback是并行完成的;

考虑到当存在大量的go创建协程时会造成大量的压力,我们也可以使用类似于github.com/panjf2000/ants的协程池来提高性能;

通过反射调用

上面创建的装饰器函数调用起来还是要创建一个新的函数,然后自己手动调用;那么我们可不可以屏蔽这个操作;让函数使用者直接传入函数,而不必自己调用,从而不需要关心,到底是使用go还是协程池来管理的当前callback协程的;

答案依然是:使用反射调用!

我们可以首先创建一个装饰后的函数,然后通过反射来调用它!

代码如下:

func ExecuteCallbackWithAttempt(fn interface{}, retryInterval int64, attempts int64, params ...interface{}) (err error) {
	defer func() {
		if err1 := recover(); err1 != nil {
			err = fmt.Errorf("decorator err: %v", err1)
		}
	}()

	decoPtr := fn
	err = decorateCallbackWithAttempt(&decoPtr, fn, retryInterval, attempts)
	if err != nil {
		return err
	}

	paramNum := len(params)
	callParams := make([]reflect.Value, paramNum)
	if paramNum > 0 {
		for k, v := range params {
			callParams[k] = reflect.ValueOf(v)
		}
	}

	go reflect.ValueOf(decoPtr).Call(callParams)

	return nil
}

上面的代码使用go创建了一个协程,并在创建失败时返回err;

当然,你也可以使用协程池来管理你的callback协程,这些底层实习对于调用方来说都是屏蔽的,调用方只需要传入函数和超时参数即可;

一个调用的例子如下:

func TestAsyncCallbackWithAttempt(t *testing.T) {
	_ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "good!")
	_ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "error")
	_ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "panic")

	time.Sleep(time.Second * 5)
}

输出如下:

=== RUN   TestAsyncCallbackWithAttempt
Test msg: good!
retry 3
retry 3
retry 2
retry 2
retry 1
retry 1
--- PASS: TestAsyncCallbackWithAttempt (5.00s)
PASS

需要注意的是:

当原Callback中不处理panic时,也会直接导致当前线程被终止:

例如,当注释掉errorCallbackTest中捕获的panic,再次运行上面的测试用例,整个测试会直接终止!

func errorCallbackTest(str string) (res string, err error) {
	//defer func() {
	//	if err1 := recover(); err1 != nil {
	//		err = fmt.Errorf("callback err: %v", err1)
	//	}
	//}()

	if str == "error" {
		return "", fmt.Errorf("mock err: %s", str)
	}
	if str == "panic" {
		panic(fmt.Errorf("panic: %s", str))
	}

	fmt.Printf("Test msg: %s\n", str)
	return fmt.Sprintf("Test msg: %s", str), nil
}

总结

上面的代码虽然仅仅起到了抛砖引玉的作用;

但是,相信读完了上面的内容,你可以很轻易的构造一个更为复杂的装饰器,或者通过反射做一些更为令人兴奋的事情;

同时,对于上述代码的不足,也请提出批评意见!

FAQs

Package last updated on 11 Jul 2020

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc