combining one observable with latest from another observable

I'm trying to combine two observables whose values share some key.

I want to produce a new value whenever the first observable produces a new value, combined with the latest value from a second observable which selection depends on the latest value from the first observable.

pseudo code example:

var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x) var obs2 = Observable.Interval(TimeSpan.FromMilliSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x) from x in obs1 let latestFromObs2WhereKeyMatches = … select Tuple.create(x, latestFromObs2WhereKeyMatches)

Any suggestions?

Clearly this could be implemented by subcribing to the second observable and creating a dictionary with the latest values indexable by the key. But I'm looking for a different approach..

Usage scenario: one minute price bars computed from a stream of stock quotes. In this case the key is the ticker and the dictionary contains latest ask and bid prices for concrete tickers, which are then used in the computation.

(By the way, thank you Dave and James this has been a very fruitful discussion)

(sorry about the formatting, hard to get right on an iPad..)


...why are you looking for a different approach? Sounds like you are on the right lines to me. It's short, simple code... roughly speaking it will be something like:

var cache = new ConcurrentDictionary<long, long>(); obs2.Subscribe(x => cache[x.Item1] = x.Item2); var results = obs1.Select(x => new { obs1 = x.Item2, cache.ContainsKey(x.Item1) ? cache[x.Item1] : 0 });

At the end of the day, C# is an OO language and the heavy lifting of the thread-safe mutable collections is already all done for you.

There may be fancy Rx approach (feels like joins might be involved)... but how maintainable will it be? And how will it perform?



I'd like to know the purpose of a such a query. Would you mind describing the usage scenario a bit?

Nevertheless, it seems like the following query may solve your problem. The initial projections aren't necessary if you already have some way of identifying the origin of each value, but I've included them for the sake of generalization, to be consistent with your extremely abstract mode of questioning. ;-)

Note: I'm assuming that someKeyThatVaries is not shared data as you've shown it, which is why I've also included the term anotherKeyThatVaries; otherwise, the entire query really makes no sense to me.

var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(x => Tuple.Create(someKeyThatVaries, x)); var obs2 = Observable.Interval(TimeSpan.FromSeconds(.25)) .Select(x => Tuple.Create(anotherKeyThatVaries, x)); var results = obs1.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 1 }) .Merge( obs2.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 2 })) .GroupBy(t => t.Key, t => new { t.Value, t.Kind }) .SelectMany(g => g.Scan( new { X = -1L, Y = -1L, Yield = false }, (acc, cur) => cur.Kind == 1 ? new { X = cur.Value, Y = acc.Y, Yield = true } : new { X = acc.X, Y = cur.Value, Yield = false }) .Where(s => s.Yield) .Select(s => Tuple.Create(s.X, s.Y)));


  • Using coinmarketcap api v2 with google sheets - adding to js object with multiple calls
  • In SQL How is it possible to use the Sum Function in an Update
  • Sql Server CTE “multi- part identifier could not be bound.”
  • Repeating named capture groups
  • A quick question about aggregate relational objects in MVC
  • Object Vba read items
  • Javascript 2D array sorting - by numerical value
  • EXCEL Multiple Ranges - need different answers for each range
  • Laravel - getting largest object based on attribute value
  • Laravel / Eloquent hasMany relationship sum()
  • Plot a 3D Surface of a polyhedron
  • Centering a specific element among others with flexbox [duplicate]
  • Full form of SKU in supportedRuntime in C#
  • VUE JS 2 + WEBPACK Cannot read property 'get' of undefined VUE RESOURCE
  • QBOv3 XML Validation Fault
  • Access VBScript variable within Javascript inside of an HTA
  • How to convert list into string with quotes in python
  • Zooming an element and its contents— an alternative to CSS3's zoom property?
  • How to prepare a C++ string for sql query
  • Show records ordered with maximum price first in PHP & MySQL
  • LNK1104: cannot open file 'kernel32.lib'
  • Pythons argparse default value doesn't work
  • Detection of framework usage on Mac system?
  • Unzip archive in .Net CORE 1.0
  • Streaming screenshots over WebRTC as a video stream from iOS
  • NUnit 3.0 TestCase const custom object arguments
  • Memory error in python- how to use more memory
  • Sort List of Strings By Version
  • Create DicomImage from scratch using Dcmtk
  • Caching attributes in superclass
  • Play WS (2.2.1): post/put large request
  • Bad request using file_get_contents for PUT request in PHP
  • How to do unit test for HttpContext.Current.Server.MapPath
  • How to make Safari send if-modified-since header?
  • Timeout for blocking function call, i.e., how to stop waiting for user input after X seconds?
  • How to pass list parameters for each object using Spring MVC?
  • Setting background image for body element in xhtml (for different monitors and resolutions)
  • How does Linux kernel interrupt the application?
  • Add sale price programmatically to product variations
  • JaxB to read class hierarchy