注册

学习笔记-Retrofit源码解析

挖掘Retrofit:2.8.0源码。介绍Retrofit如何完成对OkHttp的封装,以及Retrofit如何支持的协程。

1. Builder

Retrofit通过Retrofit.Builder创建,主要是配置各种工厂Factory

val retrofit = Retrofit.Builder()
.baseUrl("this is baseUrl")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()

Builder.builder()主要有五个步骤:

  1. 获取Platform对象,Platform是平台适配器,主要为了跨平台使用,类如安卓平台实现就是Android
  2. 初始化CallFactroyCallFactroy的作用是生产realCall,网络请求由他发出。默认值是OkHttpOKHttpClient
  3. 初始化Executor,默认通过Platform获取。
  4. 初始化CallAdapter.FactroyCallAdapter.Factroy生产CallAdapterCallAdapterrealCall的适配器,通过对realCall的包装,实现Executorjava8Futruerxjava等调度方法。存在多个,顺序是自定义配置-->默认配置。默认配置通过Platform获得。
  5. 初始化Converter.FactoryConverter.Factory生产ConverterConverter是数据转换器,将返回的数据转换为需要的数据,一般转换为我们要用的对象。存在多个,顺序是内置配置-->自定义配置-->默认配置。默认配置通过Platform获得。
class Builder{

Builder(Platform platform) {
this.platform = platform;
}

public Builder() {
//step 0
this(Platform.get());
}

public Retrofit build() {
if (baseUrl == null) {
throw new IllegalStateException("Base URL required.");
}

// step1
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
}

//step2
Executor callbackExecutor = this.callbackExecutor;
if (callbackExecutor == null) {
callbackExecutor = platform.defaultCallbackExecutor();
}

//step3
List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));

//step4
List<Converter.Factory> converterFactories = new ArrayList<>(
1 + this.converterFactories.size() + platform.defaultConverterFactoriesSize());
converterFactories.add(new BuiltInConverters());
converterFactories.addAll(this.converterFactories);
converterFactories.addAll(platform.defaultConverterFactories());

return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
}
}

到这里,Retrofit对象创建完成,一个大致的结构如下图:

retrofit结构.png

2. 创建API对象

Retrofit通过定义网络请求的接口设置请求参数和返回类型,通过调用retrofit.create()创建这个接口的对象,调用这个对象的方法生成最终的网络请求。

interface MyService {
@GET("/user")
fun getUser(): Observable<Response<User>>
}

val myServiceClass: MyService = retrofit.create(MyService::class.java)

进入到create方法,可以看到是直接调用Proxy.newProxyInstance()方法创建出对象。这是标准库提供的动态代理机制,在运行时创建接口的实例对象。

Proxy.newProxyInstance()方法有三个参数:

  • classLoader: 类加载器

  • interfaces:需要实现的接口

  • InvocationHandler:代理方法

public <T> T create(final Class<T> service) {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
private final Object[] emptyArgs = new Object[0];

@Override public @Nullable Object invoke(Object proxy, Method method,
@Nullable Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}
});
}

为了更好的理解动态代理,可以当做动态代理自动帮我们生成一个接口的实现类。将所有的方法都通过handler.invoke()代理,如下面的代码所示。只不过动态代理是运行时生成的这个类,并且是直接生成了字节码。

interface MyService {
@GET("/user")
fun getUser(): Observable<Response<User>>

@GET("/name")
fun getName(userId: String): Observable<Response<String>>
}

//自动生成的代码示例
class SuspendServiceProxy implements SuspendService {
InvocationHandler handler;

@NonNull
@Override
public Observable<Response<User>> getUser() {
return handler.invoke(
this,
SuspendService.class.getMethod("getUser", String.class),
new Object[]{}
);
}

@NonNull
@Override
public Observable<Response<String>> getName(@NonNull String userId) {
return handler.invoke(
this,
SuspendService.class.getMethod("getName", String.class),
new Object[]{userId}
);
}
}

3. 创建请求对象

创建好API对象之后,就可以调用它的方法创建请求对象。

val observable = myServiceClass.getUser()

根据前面可以知道,这个方法代理给了InvocationHandler,在这个方法首先判断这个方法对象是不是实体对象,如果是的话就直接调用就行。

如果不是一个对象,就把调用再代理给ServiceMethod。首先调用loadServiceMethod()创建ServiceMethod对象,然后调用invoke()方法得到返回值。

public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
// 判断这个方法对象是不是实体对象,如果是的话就直接调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
// 判断这个方法对象是不是实体对象,如果是的话就直接调用
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}

3.1. 创建ServiceMethod对象

ServiceMethodRetrofit内部的自定义的代理方法,实际的逻辑是交给它处理。

abstract class ServiceMethod<T> {
abstract @Nullable T invoke(Object[] args);
}

loadServiceMethod()方法获取ServiceMethod对象。

方法对象Method作为Key缓存ServiceMethod对象在Retrofit中,因为创建ServiceMethod是一个耗时过程,所以弄成单例模式。

如果拿不到缓存,就调用ServiceMethod.parseAnnotations()创建一个ServiceMethod对象。

private final Map<Method, ServiceMethod<?>> serviceMethodCache = new ConcurrentHashMap<>();


ServiceMethod<?> loadServiceMethod(Method method) {
ServiceMethod<?> result = serviceMethodCache.get(method);
if (result != null) return result;

synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = ServiceMethod.parseAnnotations(this, method);
serviceMethodCache.put(method, result);
}
}
return result;
}

ServiceMethod.parseAnnotations()中只做了一件事,创建RequestFactory然后将创建ServiceMethod的工作又交给了子类HttpServiceMethod处理。

RequestFactory是接口的参数配置,通过解析接口的注解,返回值,入参及其注解等获得这些参数。

之后将解析完成的数据传递给HttpServiceMethed,由它继续创建ServiceMethod

abstract class ServiceMethod<T> {
static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);
...
return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
}

abstract @Nullable T invoke(Object[] args);
}

3.1.1. 创建RequstFactory

RequestFactory内部也是一个Builder模式。主要做了两件事:

  1. 遍历接口注解,初始化配置参数,这里读取的是POSTGET等注释。
  2. 遍历入参及其注解,将入参转换为ParameterHandler对象,将每个参数的设置配置的逻辑代理给了它处理。
final class RequestFactory {
static RequestFactory parseAnnotations(Retrofit retrofit, Method method) {
return new RequestFactory.Builder(retrofit, method).build();
}


}
}

3.1.2.2. OkHttpCall

最终创建的网络请求对象是OkHttpCall

OkHttpCall实现了Retrofit.Call接口,这个接口与OkHttp.Call基本一致,这里只介绍它的三个方法:

  1. execute()同步发起请求并且返回请求体Response
  2. enqueue()异步发起请求,通过Callback通信,需要注意的是处理的回调也是在异步中调用的。
  3. cancel()取消请求。

查看OkHttpCall的实现,可以发现所有的Call接口方法的具体实现都是代理给了rawCall

cancel()直接代理给rawCall.cancel()

execute()代理给rawCall.execute(),将返回值交给parseResponse()转换了一次。

enqueue()代理给rawCall.enqueue(),多加了一层Callback回调,在成功回调中也是交给parseResponse()转换之后再回调给原始的Callback

也就是说OkHttpCall把所有的逻辑静态代理给了rawCall,这样做的好处是可以在对应的地方做一下额外的处理,也就是获得返回值通过parseResponse()转换数据。

fAndroid平台默认的CallBackExecutorPlatform的实现类Android中,将Runnable抛到MainHandler中,实现回调到主线程。
static final class Android extends Platform {

@Override public Executor defaultCallbackExecutor() {
return MainThreadExecutor();
}

static class MainThreadExecutor implements Executor {
private final Handler handler = new Handler(Looper.getMainLooper());

@Override public void execute(Runnable r) {
handler.post(r);
}
}
}
3.1.2.3.2. RxJava2CallAdapterFactory

RxJava2CallAdapterFactory只在返回类型是Observable之类的时候创建CallAdapter

public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {

@Override
public @Nullable CallAdapter<?, ?> get(
Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);

if (rawType != Observable.class) { //省略了其他类型
return null;
}

boolean isResult = false;
boolean isBody = false;
Type responseType;


Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
Class<?> rawObservableType = getRawType(observableType);
if (rawObservableType == Response.class) {
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
}
...
return new RxJava2CallAdapter(...);
}
}

RxJava2CallAdapter中的adapt()Call封装成observable返回。

final class RxJava2CallAdapter<R> implements CallAdapter<R, Object> {

@Override public Object adapt(Call<R> call) {
Observable<Response<R>> observable = new CallExecuteObservable<>(call);
...
return RxJavaPlugins.onAssembly(observable);
}
}

最后进到CallExecuteObservable,在启动的时候调用call.execute()并将结果抛给观察者。

final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;

CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}

@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
Call<T> call = originalCall.clone();
try {
...
Response<T> response = call.execute();
observer.onNext(response);
}
}
}

3.2. 回顾

  1. Api接口实例动态代理HttpServiceMethod
  2. 网络请求的真实执行者是OkHttpClient创建的RealCall
  3. Api接口的调用到RealCall直接有两层静态代理 OkHttpCallCallAdapter
  4. OkHttpCall代理了RealCall,额外调用Converter做序列化处理。
  5. CallAdapter代理了OkHttpCall,可以在这里做扩展,将Call转换为实际的返回类型。

4. 协程实现

在看Retrofit如何实现协程之前,先梳理一下协程的基本概念。

当有一个延迟任务,后续的逻辑又需要等待这个任务执行完成返回数据,能继续执行,为了不阻塞线程,一般就需要就需要通过线程调度和传递回调来通信。在使用了协程之后,却只需要像同步代码那样书写,就可以完成这些操作。

但这并不是什么黑魔法,并不是用了协程之后不需要线程调度和传递回调,而是将这些繁琐的事进行复杂的封装并且为我们自动生成。将回调封装成Continuation,将线程的调度封装成调度器。

Continuation ,调用它的 resume 或者 resumeWithException 来返回结果或者抛出异常,跟我们所说的回调一模一样。

调度器的本质是一个协程拦截器,它拦截的对象就是Continuation,进而在其中实现回调的调度。调度器一般使用现成的,类如Dispatchers.Main,如果去挖它的源码,你会发现到了最后,还是使用的handler.post(),也跟我们所说的线程调度一模一样。

而前面有讲到Retrofit的实现很多时候需要依据返回类型做不同的处理,所以就需要了解协程是如何自动生成的回调代码和如何传递回调。写一个简单的协程接口,看一下转换后的Java代码,以及尝试在Java代码中调用协程接口。

可以看到返回值String被封装成了Continuation<String>作为入参传递,思考一下回调不也是这样实现的。

真实的返回值成了Object(用于状态机状态切换)。

//Kotlin代码
interface SuspendService {
suspend fun C(c1: Long): String
}

//字节码转化的Java代码
public interface SuspendService {
@Nullable
Object C(long var1, @NotNull Continuation var3);
}

//尝试在Java中调用suspend方法
class MMM {
SuspendService service;

public static void main(String[] args) {
MMM mmm = new MMM();
mmm.service.C(1L, new Continuation<String>() {
@NonNull
@Override
public CoroutineContext getContext() {
return null;
}

@Override
public void resumeWith(@NonNull Object o) {

}
});
}
}

接着回到动态代理那部分,因为suspend生成的代码会多加一个回调参数Continuation,那么动态代理的时候这个参数就会传入到代理的handler中。

Continuation的创建和使用十分的繁琐,最好的处理方法应该是把它再丢进一个kotlinsuspend方法中,让编译器去处理这些东西,而这个也就是Retrofit实现协程的原理。

interface SuspendService {
@GET("/user")
suspend fun getUser(): Response<User>

@GET("/name")
suspend fun getName(userId: String): Response<String>
}

//动态代理生成字节码示例
class SuspendServiceProxy implements SuspendService {
InvocationHandler handler;

@Nullable
@Override
public Object getUser(@NonNull Continuation<? super Response<User>> $completion) {
return handler.invoke(
this,
SuspendService.class.getMethod("getUser", Continuation.class),
new Object[]{$completion}
);
}

@Nullable
@Override
public Object getName(@NonNull String userId, @NonNull Continuation<? super Response<String>> $completion) {
return handler.invoke(
this,
SuspendService.class.getMethod("getName", String.class, Continuation.class),
new Object[]{userId, $completion}
);
}
}

接着再回到HttpServiceMethod,看看刚才被省略的代码。

在这里面会判断是不是suspend方法,判断的逻辑在RequestFactory中,判断的方法就是判断参数有没有Continuation对象,感兴趣可以去RequestFactory源码瞅瞅。

现在如果是suspend方法,会直接自定义一个类型adapterType,它的实际类型是Call,泛型是实际的返回类型(Response<T>里面的T)。之后将他作为返回类型去创建CallAdapter,而这里实际创建的就是DefaultCallAdapterFactoryExecutorCallbackCall。最后创建SuspendForResponse对象返回。

abstract class HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT> {

static <ResponseT, ReturnT> retrofit2.HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
Retrofit retrofit, Method method, RequestFactory requestFactory) {

/**
* 通过判断参数是不是Continuation,标志函数是不是suspend
* 在requestFactory内部处理
*/

boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
...
Annotation[] annotations = method.getAnnotations();
Type adapterType;
if (isKotlinSuspendFunction) {
Type[] parameterTypes = method.getGenericParameterTypes();
Type responseType = Utils.getParameterLowerBound(0,
(ParameterizedType) parameterTypes[parameterTypes.length - 1]);

if (getRawType(responseType) == Response.class && responseType instanceof ParameterizedType) {
// Unwrap the actual body type from Response<T>.
responseType = Utils.getParameterUpperBound(0, (ParameterizedType) responseType);
continuationWantsResponse = true;
}
/**
* 自己新建一个返回类型,将实际的返回例行包装给Call
*/

adapterType = new Utils.ParameterizedTypeImpl(null, Call.class, responseType);
} else {
adapterType = method.getGenericReturnType();
}

CallAdapter<ResponseT, ReturnT> callAdapter =
createCallAdapter(retrofit, method, adapterType, annotations);
Type responseType = callAdapter.responseType();

Converter<ResponseBody, ResponseT> responseConverter =
createResponseConverter(retrofit, method, responseType);

okhttp3.Call.Factory callFactory = retrofit.callFactory;
if (!isKotlinSuspendFunction) {
return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
} else {
return (HttpServiceMethod<ResponseT, ReturnT>) new SuspendForResponse<>(requestFactory,
callFactory, responseConverter, (CallAdapter<ResponseT, Call<ResponseT>>) callAdapter);
}
}
}

SuspendForResponse中,adapt()的返回类型对应到了suspend的返回类型Object。并且其中的逻辑就是解析出CallContinuation对象,然后有调用KotlinExtensions.awaitResponse(),就如之前说的,它是一个suspend方法,在代理中不处理Continuation,而是交给编译器去处理。

static final class SuspendForResponse<ResponseT> extends HttpServiceMethod<ResponseT, Object> {
private final CallAdapter<ResponseT, Call<ResponseT>> callAdapter;

SuspendForResponse(RequestFactory requestFactory, okhttp3.Call.Factory callFactory,
Converter<ResponseBody, ResponseT> responseConverter,
CallAdapter<ResponseT, Call<ResponseT>> callAdapter) {
super(requestFactory, callFactory, responseConverter);
this.callAdapter = callAdapter;
}

@Override
protected Object adapt(Call<ResponseT> call, Object[] args) {
call = callAdapter.adapt(call);

Continuation<Response<ResponseT>> continuation =
(Continuation<Response<ResponseT>>) args[args.length - 1];
...
return KotlinExtensions.awaitResponse(call, continuation);
}
}

KotlinExtensions.awaitResponse()Call的扩展函数,扩展函数的实现是通过静态方法传入this的方法实现的,所以前面传入awaitResponse()的参数有两个,分别是Call对象和Continuation对象。

KotlinExtensions.awaitResponse()的主体是suspendCancellableCoroutine方法,suspendCancellableCoroutine运行在协程当中并且帮我们获取到当前协程的 CancellableContinuation 实例,CancellableContinuation是一个可取消的Continuation。通过调用它的 invokeOnCancellation 方法可以设置一个取消事件的回调,一旦这个回调被调用,那么意味着调用所在的协程被取消了,这时候我们也要相应的做出取消的响应,也就是把OkHttp发出去的请求给取消掉。这段建议多读几遍。

之后调用Call.enqueue()发送网络请求,在Callback中调用CancellableContinuation的 resume 或者 resumeWithException 来返回结果或者抛出异常。

这里的Callback也是经过了callAdapterOkHttpCall处理,乏了。

suspend fun <T> Call<T>.awaitResponse(): Response<T> {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
}

override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}

//扩展函数示例
fun <T> Call<T>.awaitResponse(){
toString()
}

//扩展函数示例转换为Java代码
public static final void awaitResponse(@NotNull Call $this$awaitResponse) {
$this$awaitResponse.toString();
}

0 个评论

要回复文章请先登录注册