Ich sah eine Frage auf StackOverflow vor kurzem, die mich zum Nachdenken gebracht. Wie kann man HTTP-Aufrufe mit rxjs begrenzen? Einige APIs haben Quoten, bei denen man nur eine bestimmte Anzahl von Anfragen in einem bestimmten Zeitraum machen kann, zum Beispiel 5 Anfragen in 1 Sekunde. In diesem Beitrag bleibe ich bei dem Beispiel mit den 5 Anfragen pro Sekunde. Bei dieser Art von Quoten werden in der Regel gleitende Fenster zum Zählen der Anfragen verwendet. Wenn wir also 10 Anfragen stellen und alle senden wollen, müssen wir zumindest einige von ihnen drosseln, sonst erreichen wir die Quote und erhalten eine nette, aber nicht sehr wünschenswerte Fehlermeldung. Wie sollten wir das angehen? Lesen Sie diesen Blogbeitrag und erfahren Sie mehr über die Ratenbegrenzung in rxjs.
Das Lösen von Problemen mit rxjs fühlt sich an wie das Spielen mit LEGO. Wir verbinden Operatoren miteinander, bis sie das tun, was wir wollen. Rxjs bietet viele Operatoren, die für dieses Szenario nützlich sein könnten, aber wir müssen vorsichtig sein, weil einige von ihnen nicht verlustfrei sind. Der Operator throttleTime beispielsweise lässt nach Ablauf einer bestimmten Zeitspanne jeweils nur einen Wert durch; die übrigen Ereignisse werden verworfen. Das Verwerfen ist nicht hilfreich. Wir wollen alle Anfragen senden; wir wollen nur, dass einige Anfragen ein wenig verzögert werden.
Glücklicherweise verfügt rxjs über einen delay operator, der den angegebenen Wert nach einer bestimmten Zeitspanne ausgibt. Damit können wir berechnen, wie lange eine Anfrage warten muss, bis sie gesendet werden kann. Die erste Anfrage kann sofort abgeschickt werden. Wir können die Anfragen gleichmäßig über den Zeitraum verteilen, indem wir die zweite Anfrage um 200 ms verzögern, die dritte Anfrage um 400 ms usw. Auf diese Weise müssen wir verfolgen, um wie viel wir eine Anfrage verzögern müssen, indem wir die Verzögerung erhöhen, wenn eine Anfrage gesendet wird, aber sie mit der Zeit verringern. So können wir sicher sein, dass wir nicht mehr Anfragen senden, als wir dürfen. Wenn wir jedoch nur 3 Anfragen auf einmal senden, sollte keine davon verzögert werden. Wir können die Anfragen in kurzen Stößen senden; wir müssen sie nicht unbedingt gleichmäßig über die Zeit verteilen.
Ein anderer Ansatz ist die Verwendung eines Token-Bucket-Algorithmus. Angenommen, wir haben 5 Token, weil wir 5 Anfragen senden können. Wenn wir eine Anforderung senden, verbrauchen wir ein Token. Wir können nur dann eine Anfrage senden, wenn wir ein Token haben. Ein verbrauchter Token kann nach einer bestimmten Dauer wiederverwendet werden; in unserem Beispiel können verbrauchte Token nach 1 Sekunde wiederverwendet werden. Auf diese Weise können wir, wenn wir über Token verfügen, kurze Stöße machen, aber wir werden nicht in der Lage sein, Anfragen zu senden, die die Quote überschreiten.
Erstellen wir nun einen rxjs-Operator, der Werte auf der Grundlage des beschriebenen Token-Bucket-Algorithmus ausgibt. Der Umriss des rxjs Operators sollte in etwa wie folgt aussehen:
Sie können angeben, wie viele Werte ausgegeben werden können und wie groß das Schiebefenster ist. Der Scheduler-Parameter ist beim Testen des Operators nützlich. Jetzt brauchen wir Token und eine Möglichkeit, sie zu verbrauchen und zu erneuern.
Wir haben also eine bestimmte Anzahl von Token. Wir brauchen tokenChanged , um zu wissen, wann ein Token verbraucht oder erneuert wird. Es handelt sich um ein BehaviorSubject, so dass es den letzten Wert ausgibt, wenn es von jemandem abonniert wird. Die Funktion consumeToken verringert die Anzahl der Token und teilt tokenChanged mit, dass dies allen mitgeteilt werden soll; renewToken ist dasselbe, erhöht aber die Anzahl der Token. Mit dem Filteroperator erstellen wir die Observable availableTokens, die nur Werte ausgibt, wenn es Token gibt, die wir verwenden können. Nun müssen wir dies auf die Werte anwenden, die von thisObservablekommen. Für jeden Wert müssen wir also auf ein verfügbares Token warten, dann ein Token nehmen und es verbrauchen, dann den Wert ausgeben und nach Ablauf der slidingWindowTime das Token erneuern. Das sieht wie folgt aus:
Hier sorgt der Operator take(1) dafür, dass wir nur 1 Token verbrauchen. Mit dem map-Operator können wir den gewünschten Wert ausgeben, ein Token verbrauchen und einen Timer zur Erneuerung dieses Tokens einrichten. Das Ergebnis dieses Codeschnipsels ist ein Observable, das manchmal einen einzigen Wert ausgibt. Das Einzige, was noch zu tun ist, ist, diese Observables in einem einzigen Observable zu sammeln. Wir können dies mit dem mergeMap-Operator tun, der unsere Observables sammelt und die von ihnen kommenden Werte ausgibt. Mit allem zusammen sieht der Operator wie folgt aus:
Unit-Tests von asynchronem Code können einschüchternd sein, aber rxjs hat eine Sache, die Marble-Tests genannt wird. Eine Murmel ist eine Zeichenkette wie 'a-b', wobei a und b für ausgegebene Werte stehen und - für den Ablauf der Zeit, so dass die Beispielmurmel bedeutet, dass das Observable anfangs den Wert a ausgibt und nach einiger Zeit den Wert b. Kurz und knapp. Ein Beispieltest würde wie folgt aussehen:
Hier erstellen wir einen TestScheduler, um den Ablauf der Zeit zu emulieren. Wir erstellen eine Quellmurmel und eine weitere Murmel, um das Ergebnis zu überprüfen. Wir sehen, dass die Fand-G-Werte zu einem späteren Zeitpunkt ausgegeben werden sollen, da sie nicht in das Fenster passen.
Rxjs ist eine leistungsfähige Methode zur Handhabung asynchroner Operationen. Mir persönlich gefällt die Möglichkeit, die vorhandenen Operatoren als Baustein zu verwenden, um komplexere Operatoren zu erstellen, die meinen Anforderungen entsprechen. Mir gefällt auch, dass es sehr einfach ist, mit marbles Tests dafür zu schreiben. Dies ist natürlich nur meine Meinung. Es gibt andere, vielleicht bessere, Lösungen da draußen. Probieren Sie es selbst aus. Wenn Sie mit dem Code herumspielen möchten, finden Sie ihn hier.