Observable sequence called several times on retryWhen, but should call only once

298 views Asked by At

I am trying to build RxSwift Auth token refresh service using following tutorial: https://www.donnywals.com/building-a-concurrency-proof-token-refresh-flow-in-combine/. However, I faced with issue, when user don't have an auth token and first refresh failed, but second refresh succeed, additional request is send, and after this (3-rd request) is completed, only then called main endpoint

So, what I see in network inspector:

  • request to refresh token (failed)
  • request to refresh token (succeed)
  • request to refresh token (succeed)
  • request to main endpoint (succeed)

But it should be:

  • request to refresh token (failed)
  • request to refresh token (succeed)
  • request to main endpoint (succeed)

I have following code for Authenticator

protocol AuthenticatorType {
    func authenticate() -> Observable<Void>
    func checkForValidAuthTokenOrRefresh(forceRefresh: Bool) -> Observable<Void>
}

extension AuthenticatorType {
    func checkForValidAuthTokenOrRefresh(forceRefresh: Bool = false) -> Observable<Void> {
        return checkForValidAuthTokenOrRefresh(forceRefresh: forceRefresh)
    }
}

final class Authenticator<Provider: RxMoyaProviderType> where Provider.Target == AuthAPI {
    private let provider: Provider
    private let cookiesStorageProvider: CookiesStorageProviderType
    private let queue = DispatchQueue(label: "Autenticator.\(UUID().uuidString)")
    
    private var refreshInProgressObservable: Observable<Void>?
    
    init(
        provider: Provider,
        cookiesStorageProvider: CookiesStorageProviderType
    ) {
        self.provider = provider
        self.cookiesStorageProvider = cookiesStorageProvider
    }
    
    func checkForValidAuthTokenOrRefresh(forceRefresh: Bool = false) -> Observable<Void> {
        return queue.sync { [weak self] in
            self?.getCurrentTokenOrRefreshIfNeeded(forceRefresh: forceRefresh) ?? .just(())
        }
    }
    
    func authenticate() -> Observable<Void> {
        provider.request(.authenticate(credentials: .defaultDebugAccount))
            .map(LoginResponse.self)
            .map { loginResponse in
                guard loginResponse.login else {
                    throw AuthenticationError.loginRequired
                }
            }
            .asObservable()
    }
}

// MARK: - Helper methods
private extension Authenticator {
    func getCurrentTokenOrRefreshIfNeeded(forceRefresh: Bool = false) -> Observable<Void> {
        if let refreshInProgress = refreshInProgressObservable {
            return refreshInProgress
        }
        
        if cookiesStorageProvider.isHaveValidAuthToken && !forceRefresh {
            return .just(())
        }
        
        guard cookiesStorageProvider.isHaveValidRefreshToken else {
            return .error(AuthenticationError.loginRequired)
        }
        
        let refreshInProgress = provider.request(.refreshToken)
            .share()
            .map { response in
                guard response.statusCode != 401 else {
                    throw AuthenticationError.loginRequired
                }
                return response
            }
            .map(RefreshReponse.self)
            .map { refreshResponse in
                guard refreshResponse.refresh else {
                    throw AuthenticationError.loginRequired
                }
            }
            .asObservable()
            .do(
                onNext: { [weak self] _ in self?.resetProgress() },
                onError: { [weak self] _ in self?.resetProgress() }
            )
        
        refreshInProgressObservable = refreshInProgress
        return refreshInProgress
    }
    
    func resetProgress() {
        queue.sync { [weak self] in
            self?.refreshInProgressObservable = nil
        }
    }
}

And thats how I refresh doing request (with logics to refresh token)

    func request(_ token: Target, callbackQueue: DispatchQueue?) -> Observable<Response> {
        authenticator.checkForValidAuthTokenOrRefresh()
            .flatMapLatest { [weak self] res -> Observable<Response> in
                self?.provider.request(token).asObservable() ?? .empty()
            }
            .map { response in
                guard response.statusCode != 401 else {
                    throw AuthenticationError.loginRequired
                }
                return response
            }
            .retry { [weak self] error in
                error.flatMap { error -> Observable<Void> in
                    guard let authError = error as? AuthenticationError, authError == .loginRequired else {
                        return .error(error)
                    }
                    
                    return self?.authenticator.checkForValidAuthTokenOrRefresh(forceRefresh: true) ?? .never()
                }
            }
    }

At first, I thought it was concurrency problem, I changed queue to NSLock, but it all was the same. Also I tried to use subscribe(on:) and observe(on:), thats also don't give any effect.

Maybe issue with do block, where I set refreshInProgressObservable to nil, because when I change onError, to afterError, I don't see third request to refresh token, but I also don't see any request to main endpoint.

I even tried to remove share(), but as you guess it don't help either.

Ah, and also I remember that 3-rd request fires instantly after second is completed, even if I add sleep in beginning of getCurrentTokenOrRefreshIfNeeded method. So that kinda strange


Edit

I tried another way to refresh token, using deferred block in Observable (inspired by Daniel tutorial).

Here is my code

final class NewProvider {
    let authProvider: MoyaProvider<AuthAPI>
    let apiProvider: MoyaProvider<AppAPI>
    
    let refreshToken: Observable<Void>
    
    init(authProvider: MoyaProvider<AuthAPI>, apiProvider: MoyaProvider<AppAPI>) {
        self.authProvider = authProvider
        self.apiProvider = apiProvider
        
        refreshToken = authProvider.rx.request(.refreshToken)
            .asObservable()
            .share()
            .map { _ in }
            .catchAndReturn(())
    }
    
    func request(_ token: AppAPI) -> Observable<Response> {
        Observable<Void>
            .deferred {
                if CookiesStorageProvider.isHaveValidAuthToken {
                    return .just(())
                } else {
                    throw AuthenticationError.loginRequired
                }
            }
            .flatMapLatest { [weak self] _ in
                self?.apiProvider.rx.request(token).asObservable() ?? .never()
            }
            .retry { [weak self] error in
                return error.flatMapLatest { [weak self] _ in
                    self?.refreshToken ?? .never()
                }
            }
    }
}

It works perfectly for one request (like, "it sends request to refresh token only when auth token is missing and try to refresh token again if token refresh failed")

However, there is problem with multiple requests. If there is no auth token and multiple request are fired, it works well, requests are waiting for token to refresh. BUT, if token refresh failed, there is no attempt to try refresh token again. I don't know what can lead to this behaviour.


EDIT 2

I found out that if I place

.observe(on: SerialDispatchQueueScheduler(queue: queue, internalSerialQueueName: "test1"))

after

.share()
        refreshToken = authProvider.rx.request(.refreshToken)
            .asObservable()
            .share()
            .observe(on: SerialDispatchQueueScheduler(queue: queue, internalSerialQueueName: "test1"))
            .map { _ in }
            .catchAndReturn(())

All will be work as expected, but now I can't understand why its working this way

1

There are 1 answers

6
Daniel T. On

Okay, I pulled down your code and spent a good chunk of the day looking it over. A couple of review points:

  • This is way more complex than it needs to be for what it's doing.
  • Any time you have a var Observable, you are doing something wrong. Observables and Subjects should always be let.
  • There is no reason or need to use a DispatchQueue the way you did for Observables. This code doesn't need one at all, but even if it did, you should be passing in a Scheduler instead of using queues directly.
  • I could see no way for your code to actually use the new token in the retry once it has been received. Even if these tests did pass, the code still wouldn't work.

As far as this specific question is concerned. The fundamental problem is that you are calling getCurrentTokenOrRefreshIfNeeded(forceRefresh:) four times in the offending test and creating three refreshInProgress Observables. You are making three of them, because the second one has emitted a result and been disposed before the last call to the function is made. Each one emits a value so you end up with three next events in authAPIProviderMock.recordedEvents.

What is the fix? I could not find a fix without making major changes to the basic structure/architecture of the code. All I can do at this point is suggest that you check out my article on this subject RxSwift and Handling Invalid Tokens which contains working code for this use case and includes unit tests. Or revisit Donny's article which I presume works, but since there are no unit tests for his code, I can't be sure.


Edit

In answer to your question in the comments, here is how you would solve the problem using my service class:

First create a tokenAcquisitionService object. Since you don't actually need to pass a token value around, just use Void for the token type.

let service = TokenAcquisitionService(initialToken: (), getToken: { _ in URLSession.shared.rx.response(request: refreshTokenRequest) }, extractToken: { _ in })

(Use whatever you want in place of URLSession.shared.rx.response(request: refreshTokenRequest). The only requirement is that it returns an Observable<(response: HTTPURLResponse, data: Data)> and in this case the data can simply be Data() or anything else, since it is ignored. It can even present a view controller that asks the user to login.)

Now at the end of every request, include the following.

    .do(onNext: { response in
        guard response.response.statusCode != 401 else { throw TokenAcquisitionError.unauthorized }
    })
    .retry(when: { $0.renewToken(with: tokenAcquisitionService) })

Wrap the above however you want so you don't have to copy pasted it onto every request.

QED