我是ReactiveX的相对新手,并且已经了解了Rx.Observable.take和Rx.Observable.takeLast分别从序列的开头和结尾获取,并且Rx.Observable.windowWithCount从原始的observable中获取可能重叠的窗口。 为了好玩,我想完全使用反应式运算符和换能器编写FFT算法。 有些算法对端口很直观,但有些算法很难用流建模。 具体来说,一个rfft作用于一个序列的开始和结束值,我不知道该怎么做。 更具体地说,如果我有:
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]它将被分解为可观察的窗口:
[[0,1,14,15],[2,3,12,13],[4,5,10,11],[6,7,8,9]]有没有一种优雅的方式来做到这一点,任何可观察的序列?
I am a relative newbie to ReactiveX and have learned about Rx.Observable.take and Rx.Observable.takeLast to take from the beginning and end of a sequence respectively and Rx.Observable.windowWithCount to take potentially overlapping windows from an original observable. For fun, I'd like to write the FFT algorithm entirely using reactive operators and transducers. Some of the algorithm is intuitive to port, but some is difficult to model with streams. Specifically, an rfft acts upon the beginning and end values of a sequence, which I can't figure out how to do. More specifically, if I have:
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]it would be broken up into observable windows of:
[[0,1,14,15],[2,3,12,13],[4,5,10,11],[6,7,8,9]]Is there an elegant way to do this for any arbitrary observable sequence?
最满意答案
我不得不说,我认为使用有限流的反应不是一个好主意,因为你没有任何事件可以对任何背压做出反应。 你必须知道流的长度并且它是有限的。 最好的解决方案是使用带有o(1)的数组。 尽管如此,这是一个可能的解决方案,它将使用许多cpu周期。 我使用RxJava2-RC5进行测试。
@Test public void ttfAlgo() throws Exception { Observable<Integer> ascendingOrdered = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .concatWith(Observable.just(11, 12, 13, 14, 15)); Observable<Integer> descendingOrdered = ascendingOrdered.sorted((t1, t2) -> { return t2.compareTo(t1); }); Observable.zip(ascendingOrdered.window(2), descendingOrdered.window(2), (w1, w2) -> { return w1.concatWith(w2.sorted()); }).flatMap(window -> { System.out.println("windowX"); return window; }).subscribe(integer -> { System.out.println(integer); }); Thread.sleep(1_000); }I have to say, that I don't think it is a good idea to use reactive with finite streams, because you do not have any events to react onto or any backpressure. You do have to know about the stream length and that it finite. The best solution would be using arrays with o(1). Nevertheless here is a possible solution, which would use many cpu cycles. I use RxJava2-RC5 for testing.
@Test public void ttfAlgo() throws Exception { Observable<Integer> ascendingOrdered = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .concatWith(Observable.just(11, 12, 13, 14, 15)); Observable<Integer> descendingOrdered = ascendingOrdered.sorted((t1, t2) -> { return t2.compareTo(t1); }); Observable.zip(ascendingOrdered.window(2), descendingOrdered.window(2), (w1, w2) -> { return w1.concatWith(w2.sorted()); }).flatMap(window -> { System.out.println("windowX"); return window; }).subscribe(integer -> { System.out.println(integer); }); Thread.sleep(1_000); }更多推荐
发布评论