Using rxjs to throttle a callback


I am consuming an API in which I register a callback that occurs frequently.

function myCallback(event) { // do things with event, something computationally intensive } const something = createSomething({ onSomethingHappened: myCallback })

I'd like to limit the rate at which this callback fires, probably using <em>throttle</em>. This project uses Angular which bundles rx. How can I adapt my code so myCallback it throttled at 300ms using rx?

I have a basic grasp on how observables work but it's been a bit confusing to figure out how the callback interface would convert to observable interface.

(edited as answers come)


I think you can use fromEventPattern:

<pre class="lang-js prettyprint-override">let something; const src$ = fromEventPattern( handler => (something = createSomething({ onSomethingHappened: handler })), ); src$.pipe( throttleTime(300), map(args => myCallback(args)) );

Note: this assumed that myCallback is a synchronous operation.

The first argument passed to fromEventPattern is the addHandler. It can also have the removeHandler, where you can put your teardown logic(e.g: releasing from memory, nulling out values etc).

<hr />

In order to get a better understanding of what is handler and why is it used there, let's see how fromEventPattern is implemented:

<pre class="lang-js prettyprint-override">return new Observable<T | T[]>(subscriber => { const handler = (...e: T[]) => subscriber.next(e.length === 1 ? e[0] : e); let retValue: any; try { retValue = addHandler(handler); } catch (err) { subscriber.error(err); return undefined; } if (!isFunction(removeHandler)) { return undefined; } // This returned function will be called when the observable // is unsubscribed. That is, on manual unsubscription, on complete, or on error. return () => removeHandler(handler, retValue) ; });


As you can see, through handler, you can let the returned observable when it's time to emit something.


You can just pipe the operator throttleTime to a fromEvent stream.

<pre class="lang-js prettyprint-override">import { fromEvent } from 'rxjs'; import { throttleTime } from 'rxjs/operators'; const mouseMove$ = fromEvent(document, 'mousemove'); mouseMove$.pipe(throttleTime(300)).subscribe(...callback);

I'm not really familiar with RxJS but something like the following is possible.

Check it out on StackBlitz: https://stackblitz.com/edit/rxjs-ijzehs.

import { Subject, interval } from 'rxjs'; import { throttle } from 'rxjs/operators'; function myCallback(v) { // do things with event, something computationally intensive console.log("got value", v) } const o$ = new Subject() o$.pipe(throttle(v => interval(300))).subscribe(myCallback) const something = createSomething({ onSomethingHappened: v => { o$.next(v) } }) function createSomething({ onSomethingHappened }) { let i = 0 setInterval(() => { console.log("calling onSomethingHappened") onSomethingHappened(i++) }, 100) }

You can use a simple Subject

<pre class="lang-js prettyprint-override">function myCallback(event) { // do things with event, something computationally intensive } // subject presents all emits. const subject = new Subject(); const subscription = subject.pipe( throttle(300), // now we limit it as you wanted. ).subscribe(myCallback); const something = createSomething({ onSomethingHappened: e => subject.next(e), // using subject instead of callback. }) // subscription.unsubscribe(); // once we don't need it.



  • kotlin wrong nullability inference without any generics
  • Can I develop a go package in multiple source directories?
  • Getting result.getDriveFolder().getDriveId().getResourceId() always null
  • Do not save model on duplicate file - Django 2
  • Inno Setup Calling DLL with string as parameter
  • Carrierwave getting image width and height and storing it in an hstore field
  • Muxing a H.264 Annex B & AAC stream using libavformat with vcopy/acopy
  • Errors with Codename One “Send iOS Build” and “Send Android Build”
  • How to override C# DateTime serialization with class auto-generated from wsdl?
  • How to add main title and manipulating axis labels in ggplot2 in Rstudio
  • Android: Check if object is present in database using Room and RxJava
  • import cv2 doesn't give error on command-Prompt but error on IDLE on Windows 10, Python 3.6.4
  • How to create a hotspot network in iOS app using Swift
  • Getting an error serving images from App_Themes when using precompilation?
  • Storyboard iOS MBProgressHUD
  • How to add CKEditor RTE to typo3 Backend Module with the API?
  • MySQL database structure for a webshop
  • Role Count using Graph Api against a tenant
  • 3 transitions, pausetime between transitions
  • ODBC connection to an .accdb file
  • How do I detect if an email client is configured on an Android device?
  • Call a specific instance of a service in Azure Service Fabric
  • Joining across databases with dbplyr
  • Laravel 5.7: Custom blade template for Maintenance Mode but not 503.blade.php
  • Gitlab: copy project to other git lab repository
  • Can someone explain how Yii minimizing assets is supposed to work on Heroku?
  • Ways of filling 10 places with number from [1..10] such that digit at ith place has value atmost 1 m
  • Get spring boot pagination number starts from 1 instead of 0
  • SyntaxError: expected expression, got '.'
  • Failed to resolve: firebase-auth-15.0.0 [closed]
  • cSPADE data mining in R using arulesSequences - Error while converting to “transactions” format
  • Java Collections.shuffle() weird behaviour [closed]
  • 'Edit' function for forum posts and such
  • Thumbnails for mxml components in Flex
  • Spring Boot fails to start
  • Cross compile glibc for arm, got undefined reference to some unwind functions
  • media foundation H264 decoder not working properly
  • Running R's aov() mixed effects model from Python using rpy2
  • Access to a Matlab gui from the web