RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Overview

RxJava: Reactive Extensions for the JVM

codecov.io Maven Central

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Version 3.x (Javadoc)

  • single dependency: Reactive-Streams
  • Java 8+ (Android desugar friendly)
  • Java 8 lambda-friendly API
  • fixed API mistakes and many limits of RxJava 2
  • intended to be a replacement for RxJava 2 with relatively few binary incompatible changes
  • non-opinionated about the source of concurrency (threads, pools, event loops, fibers, actors, etc.)
  • async or synchronous execution
  • virtual time and schedulers for parameterized concurrency
  • test and diagnostic support via test schedulers, test consumers and plugin hooks

Learn more about RxJava in general on the Wiki Home.

ℹ️ Please read the What's different in 3.0 for details on the changes and migration information when upgrading from 2.x.

Version 2.x

The 2.x version is end-of-life as of February 28, 2021. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 2.2.21, will remain accessible.

Version 1.x

The 1.x version is end-of-life as of March 31, 2018. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 1.3.8, will remain accessible.

Getting started

Setting up the dependency

The first step is to include RxJava 3 into your project, for example, as a Gradle compile dependency:

implementation "io.reactivex.rxjava3:rxjava:3.x.y"

(Please replace x and y with the latest version numbers: Maven Central )

Hello World

The second is to write the Hello World program:

package rxjava.examples;

import io.reactivex.rxjava3.core.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

Note that RxJava 3 components now live under io.reactivex.rxjava3 and the base classes and interfaces live under io.reactivex.rxjava3.core.

Base classes

RxJava 3 features several base classes you can discover operators on:

Some terminology

Upstream, downstream

The dataflows in RxJava consist of a source, zero or more intermediate steps followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

Here, if we imagine ourselves on operator2, looking to the left towards the source is called the upstream. Looking to the right towards the subscriber/consumer is called the downstream. This is often more apparent when each element is written on a separate line:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

Objects in motion

In RxJava's documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow.

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated to the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

Assembly time

The preparation of dataflows by applying various intermediate operators happens in the so-called assembly time:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

At this point, the data is not flowing yet and no side-effects are happening.

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)

This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.

Runtime

This is the state when the flows are actively emitting items, errors or completion signals:

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Simple background computation

One of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread:

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Typically, you can move computations or blocking IO to some other thread via subscribeOn. Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.

Schedulers

RxJava operators don't work with Threads or ExecutorServices directly but with so-called Schedulers that abstract away sources of concurrency behind a uniform API. RxJava 3 features several standard schedulers accessible via Schedulers utility class.

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.

These are available on all JVM platforms but some specific platforms, such as Android, have their own typical Schedulers defined: AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

In addition, there is an option to wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler via Schedulers.from(Executor). This can be used, for example, to have a larger but still fixed pool of threads (unlike computation() and io() respectively).

The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.

Concurrency within a flow

Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the "main" thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.

Parallel processing

Processing the numbers 1 to 10 in parallel is a bit more involved:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.

Note, however, that flatMap doesn't guarantee any order and the items from the inner flows may end up interleaved. There are alternative operators:

  • concatMap that maps and runs one inner flow at a time and
  • concatMapEager which runs all inner flows "at once" but the output flow will be in the order those inner flows were created.

Alternatively, the Flowable.parallel() operator and the ParallelFlowable type help achieve the same parallel processing pattern:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.

Dependent

The most typical scenario is to given a value, invoke another service, await and continue with its result:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap into the inner parts of the previous flatMap for example:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

Here, the original value will be available inside the inner flatMap, courtesy of lambda variable capture.

Non-dependent

In other scenarios, the result(s) of the first source/dataflow is irrelevant and one would like to continue with a quasi independent another source. Here, flatMap works as well:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

however, the continuation in this case stays Observable instead of the likely more appropriate Single. (This is understandable because from the perspective of flatMapSingle, sourceObservable is a multi-valued source and thus the mapping may result in multiple values as well).

Often though there is a way that is somewhat more expressive (and also lower overhead) by using Completable as the mediator and its operator andThen to resume with something else:

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

The only dependency between the sourceObservable and the someSingleSource is that the former should complete normally in order for the latter to be consumed.

Deferred-dependent

Sometimes, there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, was not flowing through the "regular channels". One would be inclined to write such continuations as follows:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

Unfortunately, this prints 0 because Single.just(count.get()) is evaluated at assembly time when the dataflow hasn't even run yet. We need something that defers the evaluation of this Single source until runtime when the main source completes:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

or

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversions

Sometimes, a source or service returns a different type than the flow that is supposed to work with it. For example, in the inventory example above, getDemandAsync could return a Single<DemandRecord>. If the code example is left unchanged, this will result in a compile-time error (however, often with a misleading error message about lack of overload).

In such situations, there are usually two options to fix the transformation: 1) convert to the desired type or 2) find and use an overload of the specific operator supporting the different type.

Converting to the desired type

Each reactive base class features operators that can perform such conversions, including the protocol conversions, to match some other type. The following matrix shows the available conversion options:

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Observable toFlowable2 first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Single toFlowable3 toObservable toMaybe ignoreElement
Maybe toFlowable3 toObservable toSingle ignoreElement
Completable toFlowable toObservable toSingle toMaybe

1: When turning a multi-valued source into a single-valued source, one should decide which of the many source values should be considered as the result.

2: Turning an Observable into Flowable requires an additional decision: what to do with the potential unconstrained flow of the source Observable? There are several strategies available (such as buffering, dropping, keeping the latest) via the BackpressureStrategy parameter or via standard Flowable operators such as onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest which also allow further customization of the backpressure behavior.

3: When there is only (at most) one source item, there is no problem with backpressure as it can be always stored until the downstream is ready to consume.

Using an overload with the desired type

Many frequently used operator has overloads that can deal with the other types. These are usually named with the suffix of the target type:

Operator Overloads
flatMap flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMap concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMap switchMapSingle, switchMapMaybe, switchMapCompletable

The reason these operators have a suffix instead of simply having the same name with different signature is type erasure. Java doesn't consider signatures such as operator(Function<T, Single<R>>) and operator(Function<T, Maybe<R>>) different (unlike C#) and due to erasure, the two operators would end up as duplicate methods with the same signature.

Operator naming conventions

Naming in programming is one of the hardest things as names are expected to be not long, expressive, capturing and easily memorable. Unfortunately, the target language (and pre-existing conventions) may not give too much help in this regard (unusable keywords, type erasure, type ambiguities, etc.).

Unusable keywords

In the original Rx.NET, the operator that emits a single item and then completes is called Return(T). Since the Java convention is to have a lowercase letter start a method name, this would have been return(T) which is a keyword in Java and thus not available. Therefore, RxJava chose to name this operator just(T). The same limitation exists for the operator Switch, which had to be named switchOnNext. Yet another example is Catch which was named onErrorResumeNext.

Type erasure

Many operators that expect the user to provide some function returning a reactive type can't be overloaded because the type erasure around a Function<T, X> turns such method signatures into duplicates. RxJava chose to name such operators by appending the type as suffix as well:

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

Even though certain operators have no problems from type erasure, their signature may turn up being ambiguous, especially if one uses Java 8 and lambdas. For example, there are several overloads of concatWith taking the various other reactive base types as arguments (for providing convenience and performance benefits in the underlying implementation):

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

Both Publisher and SingleSource appear as functional interfaces (types with one abstract method) and may encourage users to try to provide a lambda expression:

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

Unfortunately, this approach doesn't work and the example does not print 2 at all. In fact, since version 2.1.10, it doesn't even compile because at least 4 concatWith overloads exist and the compiler finds the code above ambiguous.

The user in such situations probably wanted to defer some computation until the someSource has completed, thus the correct unambiguous operator should have been defer:

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

Sometimes, a suffix is added to avoid logical ambiguities that may compile but produce the wrong type in a flow:

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

This can get also ambiguous when functional interface types get involved as the type argument T.

Error handling

Dataflows can fail, at which point the error is emitted to the consumer(s). Sometimes though, multiple sources may fail at which point there is a choice whether or not wait for all of them to complete or fail. To indicate this opportunity, many operator names are suffixed with the DelayError words (while others feature a delayError or delayErrors boolean flag in one of their overloads):

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

Of course, suffixes of various kinds may appear together:

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

The base classes can be considered heavy due to the sheer number of static and instance methods on them. RxJava 3's design was heavily influenced by the Reactive Streams specification, therefore, the library features a class and an interface per each reactive type:

Type Class Interface Consumer
0..N backpressured Flowable Publisher1 Subscriber
0..N unbounded Observable ObservableSource2 Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

1The org.reactivestreams.Publisher is part of the external Reactive Streams library. It is the main type to interact with other reactive libraries through a standardized mechanism governed by the Reactive Streams specification.

2The naming convention of the interface was to append Source to the semi-traditional class name. There is no FlowableSource since Publisher is provided by the Reactive Streams library (and subtyping it wouldn't have helped with interoperation either). These interfaces are, however, not standard in the sense of the Reactive Streams specification and are currently RxJava specific only.

R8 and ProGuard settings

By default, RxJava itself doesn't require any ProGuard/R8 settings and should work without problems. Unfortunately, the Reactive Streams dependency since version 1.0.3 has embedded Java 9 class files in its JAR that can cause warnings with the plain ProGuard:

Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher

It is recommended one sets up the following -dontwarn entry in the application's proguard-ruleset file:

-dontwarn java.util.concurrent.Flow*

For R8, the RxJava jar includes the META-INF/proguard/rxjava3.pro with the same no-warning clause and should apply automatically.

Further reading

For further details, consult the wiki.

Communication

Versioning

Version 3.x is in development. Bugfixes will be applied to both 2.x and 3.x branches, but new features will only be added to 3.x.

Minor 3.x increments (such as 3.1, 3.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 3.x.y increments (such as 3.0.0 -> 3.0.1, 3.3.1 -> 3.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload). New functionality marked with an @Beta or @Experimental annotation can also be added in the patch releases to allow rapid exploration and iteration of unstable new functionality.

@Beta

APIs marked with the @Beta annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

@Experimental

APIs marked with the @Experimental annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

@Deprecated

APIs marked with the @Deprecated annotation at the class or method level will remain supported until the next major release but it is recommended to stop using them.

io.reactivex.rxjava3.internal.*

All code inside the io.reactivex.rxjava3.internal.* packages are considered private API and should not be relied upon at all. It can change at any time.

Full Documentation

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

Example for Gradle:

implementation 'io.reactivex.rxjava3:rxjava:x.y.z'

and for Maven:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

and for Ivy:

<dependency org="io.reactivex.rxjava3" name="rxjava" rev="x.y.z" />

Snapshots

Snapshots are available via https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava3/rxjava/

repositories {
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}

JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot

Build

To build:

$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build

Further details on building can be found on the Getting Started page of the wiki.

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright (c) 2016-present, RxJava Contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Issues
  • Idiomatic Scala Support

    Idiomatic Scala Support

    As of version 0.11.0 Scala support is provided through the use of implicits. Conversations on Twitter are bringing up other possible improvements. Let's use this issue to discuss.

    opened by benjchristensen 96
  • 2.x: Handling null values

    2.x: Handling null values

    With the upcoming RxJava2 release one of the important changes is that null is no longer accepted as a stream element.

    Honestly, I have mixed feelings about this change and part of me understands that it will enforce clean APIs, but I can see a number of use cases when this might be a problem.

    For instance, in my app I have an in-memory cache:

    @Nullable CacheItem findCacheItem(long id);
    

    CacheItem might not be present in cache, so method might return null value.

    The way it is used with Rx* - is as following:

    Observable<CacheItem> getStream(final long id) {
        return Observable.fromCallable(new Callable<CacheItem>() {
            @Override public CacheItem call() throws Exception {
                return findCacheItem(id);
            }
        });
    }
    

    So with this approach, I might get null in my stream which is totally valid situation, so it is handled properly on receiving side - let's say UI changes its state if item is not present in cache:

    Observable.just(user)
              .map(user -> user.getName())
              .map(name -> convertNameToId(name))
              .flatMap(id -> getStream(id))
              .map(cacheItem -> getUserInfoFromCacheItem(cacheItem))
              .subscribe(
                  userInfo -> {
                      if(userInfo != null) showUserInfo();
                      else showPrompt();
                  }
              );
    

    With RxJava2 I am no longer allowed to post null down the stream, so I either need to wrap my CacheItem into some other class and make my stream produce that wrapper instead or make quite big architectural changes.

    Wrapping every single stream element into nullable counterpart doesn't look right to me.

    Am I missing something fundamental here?

    It seems like the situation like mine is quite popular, so Im curious what is the recommended strategy to tackle this problem given new "no null" policy in RxJava2?

    Question 
    opened by paveldudka 89
  • OnSubscribeRedo - fix race conditions

    OnSubscribeRedo - fix race conditions

    While searching for the cause of #2863 I bumped into this race condition (which doesn't fix #2863):

    If a request is made between L238 and L239 then consumerCapacity may become Long.MAX_VALUE on arriving at L239 in which case we don't wish to decrement it. To fix, used compareAndSet.

    What is interesting about this fix is that in the test loop of 5000 in OperatorRetryTest I see many more occurrences of the failure on average (3 -> 50) presumably because the extra time to perform the compareAndSet action has expanded the window for the race condition causing the failures.

    Bug 
    opened by davidmoten 83
  • Observer reference is not released on unsubscribe

    Observer reference is not released on unsubscribe

    With the following code http://pastebin.com/VUZ5aApe and using the latest version of rxjava (0.18.4),

    rotations should result in the activity calling unsubscribe on my subscription.

    This should release the reference to my observer, which should in turn release the reference to my activity.

    However, the memory tests I've done show that as I rotate the phone, my activity count keeps going up, which means the reference to my activity is not being released.

    (let me know if the pastebin expires for some reason, I'll get something back up)

    opened by fdoyle 74
  • Roadmap to 1.0

    Roadmap to 1.0

    I want everyone to know where we're heading and what's left before we hit 1.0.

    From the beginning we have allowed ourselves to do breaking changes on each 0.x release as we were aware that the design was not finished when we started this project. We are getting close to being done. Our goal is to release 1.0 in the coming months after stabilizing the API so that it can be relied upon without breaking every couple months.

    Project Structure

    When we hit 1.0 we intend on splitting out the language adaptors into their own top-level projects such as RxScala, RxClojure, RxGroovy, RxKotlin, RxJRuby etc.

    This will allow each project to iterate as needed at their own pace, especially since some will need to continue iterating while the RxJava core stabilizes. For example, if RxScala needs breaking changes it can bump it's major version while RxJava does not. This is particularly important to RxScala for handling changes such as Scala 2.10 -> 2.11 -> 2.12 etc.

    Major contrib modules will also be moved out, such as RxAndroid which also needs its own life-cycle.

    Outstanding Work

    The major items of work to be finished before 1.0 are:

    • ~~Backpressure: https://github.com/Netflix/RxJava/issues/1000~~ [Completed]
    • ~~Serialization Behavior: https://github.com/Netflix/RxJava/issues/998~~ [Completed in new merge implementation]
    • ~~Scheduler API: https://github.com/Netflix/RxJava/issues/997~~ Completed
    • ~~Remove all deprecated methods/classes~~ [Completed] #1053 #1621
    • ~~Finish migrating all operators to using lift and chaining the Subscription via Subscriber (current priority)~~ [Completed]

    The primary goal is to nail down the public API. New functionality can come in 1.1, 1.2, etc. The secondary goal is for all operators to work as advertised (regarding unsubscribe, back pressure and non-blocking). There will always be bugs, that's why 1.x.y will still be active after release, but the desire is to not need to ship 2.x soon after 1.x as this is a low level library that once entrenched becomes hard to migrate (we create significant pain at Netflix on each 0.x release).

    Going Forward

    Please comment if you feel there are other critical things to achieve before 1.0. The fastest way to getting us to 1.0 is helping us achieve the work stated above.

    Information 
    opened by benjchristensen 59
  • Version 0.17.0 Release Notes [Preview]

    Version 0.17.0 Release Notes [Preview]

    #0.17.0 Release Notes

    Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.

    The new signatures related to Observable in this release are:

    // A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
    public final static <T> Observable<T> create(OnSubscribe<T> f)
    
    // The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
    public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
    
    // Subscriber is an Observer + Subscription
    public abstract class Subscriber<T> implements Observer<T>, Subscription
    
    // The main `subscribe` behavior receives a Subscriber instead of Observer
    public final Subscription subscribe(Subscriber<? super T> observer)
    
    // Subscribing with an Observer however is still appropriate
    // and the Observer is automatically converted into a Subscriber
    public final Subscription subscribe(Observer<? super T> observer)
    
    // A new 'lift' function allows composing Operator implementations together
    public <R> Observable<R> lift(Func1<Subscriber<? super R>, Subscriber<? super T>> lift)
    
    

    Also changed is the Scheduler interface which is much simpler:

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.

    As shown in the code above the changes fall into 2 major sections:

    1) Lift/OnSubscribe/Subscriber

    Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.

    2) Schedulers

    Simplification of the Scheduler interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.

    Lift/OnSubscribe/Subscriber

    New types Subscriber and OnSubscribe along with the new lift operator have been added. The reasons and benefits are as follows:

    1) Synchronous Unsubscribe

    RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Observer<? super Integer> Observer) {
            for (int i = 1; i < 1000000; i++) {
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    });
    

    Subscribing to this Observable will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10).

    Version 0.17.0 fixes this issue by injecting the Subscription into the OnSubscribe function to allow code like this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            // we now receive a Subscriber instead of Observer
            for (int i = 1; i < 1000000; i++) {
                // the OnSubscribe can now check for isUnsubscribed
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    
    });
    

    Subscribing to this will now correctly only emit 10 onNext and unsubscribe:

    // subscribe with an Observer
    oi.take(10).subscribe(new Observer<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
        }
    
    })
    

    Or the new Subscriber type can be used and the Subscriber itself can unsubscribe:

    // or subscribe with a Subscriber which supports unsubscribe
    oi.subscribe(new Subscriber<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
            if(t >= 10) {
                // a Subscriber can unsubscribe
                this.unsubscribe();
            }
        }
    
    })
    

    2) Custom Operator Chaining

    Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable is using static methods. This has meant code like this:

    MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()
    

    In reality we want:

    observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()
    

    Using the newly added lift we can get quite close to this:

    observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()
    

    Here is how the proposed lift method looks if all operators were applied with it:

    Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);
    

    Along with the lift function comes a new Operator signature:

    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
    

    All operator implementations in the rx.operators package will over time be migrated to this new signature.

    3) Simpler Operator Implementations

    The lift operator injects the necessary Observer and Subscription instances (via the new Subscriber type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription is available in-scope there are no awkward coding patterns needed for creating a Subscription, closing over it and returning and taking into account synchronous vs asynchronous.

    For example, the body of fromIterable is simply:

    public void call(Subscriber<? super T> o) {
        for (T i : is) {
            if (o.isUnsubscribed()) {
                return;
            }
            o.onNext(i);
        }
        o.onCompleted();
    }
    

    The take operator is:

    public final class OperatorTake<T> implements Operator<T, T> {
    
        final int limit;
    
        public OperatorTake(int limit) {
            this.limit = limit;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> o) {
            CompositeSubscription parent = new CompositeSubscription();
            if (limit == 0) {
                o.onCompleted();
                parent.unsubscribe();
            }
            return new Subscriber<T>(parent) {
    
                int count = 0;
                boolean completed = false;
    
                @Override
                public void onCompleted() {
                    if (!completed) {
                        o.onCompleted();
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    if (!completed) {
                        o.onError(e);
                    }
                }
    
                @Override
                public void onNext(T i) {
                    if (!isUnsubscribed()) {
                        o.onNext(i);
                        if (++count >= limit) {
                            completed = true;
                            o.onCompleted();
                            unsubscribe();
                        }
                    }
                }
    
            };
        }
    
    }
    

    4) Recursion/Loop Performance

    The fromIterable use case is 20x faster when implemented as a loop instead of recursive scheduler (see https://github.com/Netflix/RxJava/commit/a18b8c1a572b7b9509b7a7fe1a5075ce93657771).

    Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.

    Schedulers

    Schedulers were greatly simplified to a design based around Action1<Inner>.

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This design change originated from three findings:

    1. It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.

    To solve this the new design explicitly has the outer Scheduler and then Scheduler.Inner for recursion.

    1. The passing of state is not useful since scheduling over network boundaries with this model does not work.

    In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler that attempted to use observeOn to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.

    1. The number of overloads with different ways of doing the same things were confusing.

    This new design removes all but the essential and simplest methods.

    1. A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.

    This new design applies similar principles as done with lift/create/OnSubscribe/Subscriber and injects the Subscription via the Inner interface so a running task can check isUnsubscribed().

    WIth this new design, the simplest execution of a single task is:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
        }
    
    });
    

    Recursion is easily invoked:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
            inner.schedule(this);
        }
    
    });
    

    The use of Action1<Inner> on both the outer and inner levels makes it so recursion that refer to this and it works easily.

    Similar to the new lift/create pattern with Subscriber the Inner is also a Subscription so it allows efficient loops with unsubscribe support:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    

    An action can now unsubscribe the Scheduler.Inner:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                int i = doOtherWork();
                if(i > 100) {
                    // an Action can cause the Scheduler to unsubscribe and stop
                    inner.unsubscribe();
                }
            }
        }
    
    });
    

    Typically just stopping is sufficient:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            }
        }
    
    });
    

    but if other work in other tasks is being done and you want to unsubscribe conditionally you could:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            } else {
                inner.unsubscribe();
            }
        }
    
    });
    

    and the recursion can be delayed:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed ... but delay the recursion
            inner.schedule(this, 500, TimeUnit.MILLISECONDS);
        }
    
    });
    

    The methods on the Inner never return a Subscription because they are always a single thread/event-loop/actor/etc and controlled by the Subscription returned by the initial Scheduler.schedule method. This is part of clarifying the contract.

    Thus an unsubscribe controlled from the outside would be done like this:

    Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    
    // unsubscribe from outside
    s.unsubscribe();
    

    Migration Path

    1) Lift/OnSubscribe/Subscriber

    The lift function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber type is also additive and for most use cases does not need to be used directly, the Observer interface can continue being used.

    The previous create(OnSubscribeFunc f) signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.

    Code such as this:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(Observer<? super Integer> o) {
            o.onNext(1);
            o.onNext(2);
            o.onCompleted();
            return Subscriptions.empty();
        }
    });
    

    should change to this:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onCompleted();
        }
    });
    

    If concurrency was being injected:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(final Observer<? super Integer> o) {
            final BooleanSubscription s = new BooleanSubscription();
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (s.isUnsubscribed()) {
                        o.onNext(i++);
                    }
                }
    
            });
            t.start();
            return s;
        }
    });
    

    you may no longer need it and can implement like this instead:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    });
    

    or can just simplify the Subscription management:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
            t.start();
        }
    });
    

    or use a Scheduler:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Schedulers.io().schedule(new Action1<Inner>() {
    
                @Override
                public void call(Inner inner) {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
        }
    });
    

    or use subscribeOn which now works to make synchronous Observables async while supporting unsubscribe (this didn't work before):

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    }).subscribeOn(Schedulers.newThread());
    

    2) Schedulers

    Custom Scheduler implementations will need to be re-implemented and any direct use of the Scheduler interface will also need to be updated.

    3) Subscription

    If you have custom Subscription implementations you will see they now need an isUnsubscribed() method.

    You can either add this method, or wrap your function using Subscriptions.create and it will handle the isUnsubscribed behavior and execute your function when unsubscribe() is called.

    The Future...

    This is hopefully the last of the major refactors to rxjava-core and we're approaching version 1.0. We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create/subscribe signatures as we want and the Subscription and Scheduler interfaces are now clean.

    We need to improve on some of the Subject implementations still, particularly ReplaySubject. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.

    We appreciate your usage, feedback and contributions and hope the library is creating value for you!

    opened by benjchristensen 57
  • Proposed Scheduler Interface Change for 0.18 (yes, again)

    Proposed Scheduler Interface Change for 0.18 (yes, again)

    Reviewing the Scheduler interface changes of 0.17 with @headinthebox revealed that we're not 100% happy with the outcome, particularly after learning that Java 8 does not allow referencing this from within a lambda.

    The Scheduler interface as of 0.17 is:

    class Scheduler {
        public abstract Subscription schedule(Action1<Scheduler.Inner> action);
        public abstract Subscription schedule(Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit);
        public Subscription scheduleRecursive(Action1<Recurse> action);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public int degreeOfParallelism();
        public long now();
    
        public static final class Recurse {
            public void schedule() {
            public void schedule(long delay, TimeUnit unit) {
        }
    
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    We have determined two problems with this:

    1. Inner/Outer Dance

    In practice we have found that usage is always one of two things, either you just interact with the outer and don't care about the Inner, or you immediately need the Inner and have to do an awkward first scheduling just to get access to the Inner. (See here and weep.)

    1. Recursion

    The Action1<Scheduler.Inner> signature was chosen and put on both outer and inner so that an inner class could refer to itself using this to simply reschedule itself from the outer onto the inner.

    It was assumed this would work in Java 8 lambdas but unfortunately we did not prove it.

    This works with anonymous classes:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            System.out.println("do stuff");
            // recurse
            inner.schedule(this);
        }
    
    });
    

    but this does not with lambdas:

    Schedulers.newThread().schedule((inner) -> {
        System.out.println("do stuff");
        inner.schedule(this); // doesn't compile
    });
    

    So we end up with this:

    Schedulers.newThread().scheduleRecursive((recurse) -> {
        System.out.println("do stuff");
        recurse.schedule();
    });
    

    At that point it's clear that Inner is not working well and we have Recurse to fix the problem.

    Thus, the proposed changes (breaking again) are:

    class Scheduler {
        public final Subscription schedule(Action1<Recurse> action);
        public final Subscription schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        public final Subscription schedulePeriodically(Action1<Recurse> action, long initialDelay, long period, TimeUnit unit);
        public abstract Inner createInner(); // for advanced use cases like `observeOn`
        public int degreeOfParallelism();
        public long now();
    
        // now the primary interface
        public static final class Recurse {
            public final void schedule();
            public final void schedule(long delay, TimeUnit unit);
            public final void schedule(Action1<Recurse> action);
            public final void schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        }
    
        // now mostly an implementation detail except for advanced use cases
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Recurse> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Recurse> action);
            public long now();
        }
    }
    

    The name of Recurse is up for debate. It may be possible to merge Recurse and Inner but I haven't figured it out yet. The reason is that Inner is a single instance representing a thread or event-loop whereas Recurse represents an Action or work. Thus a given Inner could have multiple Recurse actions scheduled on to it. It is being an Action that allows it to recurse by invoking schedule() that just reschedules itself.

    This would make it better support Java 8 lambdas and simply recursion, while also better supporting (via the createInner() method) the more complicated use cases like observeOn where current code is very awkward.

    This needs to be the last refactor of this so we nail it down and stop breaking things and can get to 1.0.

    Let the discussion begin ...

    opened by benjchristensen 55
  • Profiling Memory Usage and Object Creation

    Profiling Memory Usage and Object Creation

    We need to spend time profiling memory and object allocation and finding places where we can improve.

    I would really appreciate help diving into this and finding problem areas. Even if you don't fix them, but just identity use cases, operators, etc that would be very valuable.

    This is partly a result of the fact that in Netflix production we have seen an increase in YoungGen GCs since 0.17.x.

    The areas to start should probably be:

    • Observable.create
    • Observable.lift
    • Subscriber
    • CompositeSubscription
    • map
    • flatMap

    If you can or want to get involved in this please comment here so we all can collaborate together.

    opened by benjchristensen 54
  • Experimental Proposal of rx.Task

    Experimental Proposal of rx.Task

    Adds rx.Task as a "scalar Observable" for representing work with a single return value.

    See https://github.com/ReactiveX/RxJava/issues/1594 rx.Future/Task

    This provides a type similar to Future in that it represents a scalar unit of work, but it is lazy like an Observable and many Tasks can be combined into an Observable stream. Note how Task.zip returns Task<R> whereas Task.merge returns Observable<R>.

    NOTE: This is for experimentation and feedback at this time.

    Items requiring review and work that I'm particularly aware of:

    • naming of OnExecute
    • naming of TaskObserver (this one in particular I don't like)
    • design and implementation of Task.Promise
    • should the public lift use the Observable.Operator or should that only be for internal reuse?
    • should we have a public lift that uses a Task.Operator?
    • the Task.toObservable implementation right now is efficient but will likely break something so it likely needs to change to use subscribe
    • implementation of this merge variant: Task<T> merge(final Task<? extends Task<? extends T>> source)
    • several operators currently just wrap as Observable to reuse existing operators ... is that okay performance wise?
    • Javadocs

    Examples of using this class:

    import rx.Observable;
    import rx.Task;
    import rx.Task.Promise;
    
    public class TaskExamples {
    
        public static void main(String... args) {
            // scalar synchronous value
            Task<String> t1 = Task.create(t -> {
                t.onSuccess("Hello World!");
            });
    
            // scalar synchronous value using helper method
            Task<Integer> t2 = Task.just(1);
    
            // synchronous error
            Task<String> error = Task.create(t -> {
                t.onError(new RuntimeException("failed!"));
            });
    
            // executing
            t1.subscribe(System.out::println);
            t2.subscribe(System.out::println);
            error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));
    
            // scalar Tasks for request/response like a Future
            getData(1).subscribe(System.out::println);
            getDataUsingPromise(2).subscribe(System.out::println);
    
            // combining Tasks into another Task
            Task<String> zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b);
    
            // combining Tasks into an Observable stream
            Observable<String> merged = Task.merge(t1, t2.map(String::valueOf), getData(3));
            Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));
    
            zipped.subscribe(v -> System.out.println("zipped => " + v));
            merged.subscribe(v -> System.out.println("merged => " + v));
            mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
        }
    
        /**
         * Example of an async scalar execution using Task.create
         * <p>
         * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
         *
         * @param arg
         * @return
         */
        public static Task<String> getData(int arg) {
            return Task.create(s -> {
                new Thread(() -> {
                    try {
                        Thread.sleep(500);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // deliver value
                        s.onSuccess("Data=" + arg);
                    }).start();
            });
        }
    
        /**
         * Example of an async scalar execution using a Task.Promise
         * <p>
         * This shows how an eager (hot) process would work like using a Future.
         *
         * @param arg
         * @return
         */
        public static Task<String> getDataUsingPromise(int arg) {
            Task.Promise<String> p = Promise.create();
    
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    p.onSuccess("Data=" + arg);
                }).start();
    
            return p.getTask();
        }
    }
    
    Enhancement 
    opened by benjchristensen 50
  • Adding super/extends so that Observable is covariant

    Adding super/extends so that Observable is covariant

    Ok, so this pull request changes a lot of lines. It's mostly generalizing all the FuncXs to be used like FuncX[-T1, -T2, ..., -TX, +R] (contravariant parameters, covariant return type) and all the Observers to be used "in a contravariant way". A few of the Observable uses are covariant, now, too (mostly zip).

    This is the pull request for #326.

    This doesn't look very good in the code (thanks Java). Also, it doesn't seem to make Scala interop easier at all (at least not yet).

    Please take a look. I'm not exactly happy with the result. - Maybe I'm doing something wrong here? - I've still got hope that there's an easier way...

    The pull request compiles and tests ok for me (except for the Clojure module, but that's another story and not due to my changes).

    opened by jmhofer 46
  • UnicastSubjectTest > fusedNoConcurrentCleanDueToCancel FAILED

    UnicastSubjectTest > fusedNoConcurrentCleanDueToCancel FAILED

    io.reactivex.rxjava3.subjects.UnicastSubjectTest > fusedNoConcurrentCleanDueToCancel FAILED
        org.junit.runners.model.TestTimedOutException: test timed out after 5 minutes
            at io.reactivex.rxjava3.subjects.UnicastSubject.drainFused(UnicastSubject.java:416)
            at io.reactivex.rxjava3.subjects.UnicastSubject.drain(UnicastSubject.java:464)
            at io.reactivex.rxjava3.subjects.UnicastSubject.onNext(UnicastSubject.java:319)
            at io.reactivex.rxjava3.subjects.UnicastSubjectTest.fusedNoConcurrentCleanDueToCancel(UnicastSubjectTest.java:481)
    
    Test-Failures 3.x Investigating 
    opened by akarnokd 0
  • FlowableConcatMapSchedulerTest > mapperDelayError2ScheduledLong failed

    FlowableConcatMapSchedulerTest > mapperDelayError2ScheduledLong failed

    io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapSchedulerTest > mapperDelayError2ScheduledLong FAILED
        java.lang.AssertionError: Not completed (latch = 1, values = 1, errors = 0, completions = 0, timeout!, disposed!)
            at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:125)
            at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:178)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapSchedulerTest.mapperDelayError2ScheduledLong(FlowableConcatMapSchedulerTest.java:1093)
    
    Test-Failures 3.x Investigating 
    opened by akarnokd 0
  • `Flowable#groupBy` race leads to a back-pressure issue

    `Flowable#groupBy` race leads to a back-pressure issue

    Hi!

    While debugging https://github.com/reactor/reactor-core/issues/2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)

    Apparently, with 3.0.7, the same construction in RxJava fails with a very similar issue (although the failure is different):

    final int total = 100;
    
    Long count = Flowable.range(0, total)
                         .groupBy(i -> (i / 2) * 2)
                         .flatMapMaybe(Flowable::firstElement, false, 1)
                         .observeOn(Schedulers.io())
                         .count()
                         .blockingGet();
    assertThat(total - count).as("count").isZero();
    

    Gives (not 100% reliably, consider running in "rerun until failure" mode):

    io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
    
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)
    

    A few interesting observations:

    1. Changing observeOn's buffer size to 131 and higher makes it always pass
    2. 130 would sometimes fail with Unable to emit a new group (#99) due to lack of requests
    3. 129 would sometimes fail with Unable to emit a new group (#98) due to lack of requests
    4. 128 would sometimes fail with Unable to emit a new group (#97) due to lack of requests
    5. etc etc

    So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.

    Question 3.x 
    opened by bsideup 4
  • 3.x: recursive concat causes StackOverflowError

    3.x: recursive concat causes StackOverflowError

    Originally posted on StackOverflow.

    The following code crashes with StackOverflowError and the stacktrace shows a long chain of request calls.

    import io.reactivex.rxjava3.core.Flowable
    import io.reactivex.rxjava3.core.Single
    import io.reactivex.rxjava3.schedulers.Schedulers
    import java.util.concurrent.TimeUnit.SECONDS
    
    fun main() {
        fun incr(n: Int): Single<Int> = Single.just(n + 1)
    
        fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
            if (n < max)
                incr(n)
                .observeOn(Schedulers.single())
                .toFlowable()
                .concatMap { next -> numbers(next, max) }
            else
                Flowable.empty()
        )
    
        numbers(1, 10_000)
        .blockingForEach(::println)
    }
    
    Exception in thread "main" java.lang.StackOverflowError
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    

    I'm not sure why there is such a chain created and if this is a result of an RxJava bug or not.

    3.x Investigating 
    opened by akarnokd 5
  • reactivex.github.io needs a new administrator

    reactivex.github.io needs a new administrator

    I'm no longer able to devote time to maintaining the reactivex.github.io site and its associated ReactiveX repo. There are some outstanding PRs that need review/merging, but in general there hasn't been a whole lot of activity there over the past months, so this is probably not a particularly demanding position.

    opened by DavidMGross 1
  • 3.x: parallel performs poorly with 10+ parallelism

    3.x: parallel performs poorly with 10+ parallelism

    For some reason, the parallel Scrabble benchmark performs poorly when the parallelism level is 10+, for example, on my i7 8700 CPU (6 cores/12 threads):

    image

    However, my older i7 4770K processor (4 cores/8 threads) shows no such performance degradation. ~~Neither does the reactive-streams-commons implementation (the parent of RxJava's parallel implementation) with parallelism=12.~~ Correction: The Rsc benchmark was pinned to 8 threads and actually shows a similar inefficiency with 10+.

    Performance 3.x 
    opened by akarnokd 3
  • Update wiki to reflect 3.x (tracking issue)

    Update wiki to reflect 3.x (tracking issue)

    This is the overview of the suggested/planned changes to the Wiki. Text in bold are for extra considerations/options.

    Documentation Discussion PR welcome 3.x 
    opened by akarnokd 22
  • 3.x: Performance improvements (tracking issue)

    3.x: Performance improvements (tracking issue)

    I assembled a fresh evaluation of RxJava 1, 2 and Reactor 3.2 performance under various usages and found the following deficiencies in RxJava 2.

    • [x] Flowable.empty() appears to be consuming a lot of memory.
    • [ ] See if a non-trampolining Schedulers.from could improve async/pipeline performance.
    • [ ] Check why Observable.blockingLast has 30% more overhead than its Flowable counterpart.
    • [ ] Check why there is a lot more overhead with ReplayProcessor and ReplaySubject compared to v1.
    • [ ] Check why there is a lot more overhead with UnicastProcessor and UnicastSubject compared to v1.
    • [ ] Check why Observable.fromArray is somewhat slower with longer sources than Flowable.
    • [ ] Check why Observable.fromIterable is slower with longer sources than Flowable.
    • [ ] Optimize Observable.concatMap for scalar inner sources.
    • [ ] Check why Flowable.flatMapIterable is drastically slower than Observable and Reactor's version in just/range/crossmap scenarios.
    Performance PR welcome 3.x 
    opened by akarnokd 5
  • 3.x: Flowable missing or incorrect marbles (tracking issue)

    3.x: Flowable missing or incorrect marbles (tracking issue)

    • [x] amb: indicate cancellation and request propagation
    • [x] ambArray: indicate cancellation and request propagation
    • [ ] combineLatest + 4: missing diagram
    • [ ] combineLatestDelayError + 5: missing diagram
    • [ ] concatArrayDelayError: operator name, indicate error delayed
    • [x] concatArrayEager + 1: missing diagram
    • [x] concatArrayEagerDelayError + 1: missing diagram
    • [ ] concatDelayError + 2: missing diagram
    • [x] concatEager + 4: missing diagram
    • [ ] concatEagerDelayError + 4: missing diagram
    • [ ] create: missing diagram
    • [ ] error(Callable): indicate callback that creates the Throwable
    • [ ] `error(Throwable): indicate parameter
    • [ ] fromArray: operator name, indicate items inside the box
    • [ ] fromCallable: indicate item is the result of the callback inside the box
    • [ ] fromFuture + 3: operator name
    • [ ] fromIterable: operator name, indicate items inside the box
    • [ ] fromPublisher: missing diagram
    • [ ] generate + 4: missing diagram
    • [ ] intervalRange + 1: missing diagram
    • [ ] just + 8: indicate item inside the box (as many as there are sources, so 1..9)
    • [ ] mergeArray + 1: operator name
    • [ ] mergeArrayDelayError + 1: operator name
    • [ ] mergeDelayError + 1: indicate inner sources are generated
    • [ ] sequenceEqual + 3: indicate cancellation after the mismatch has been found
    • [ ] switchOnNextDelayError + 1: operator name, indicate error delay
    • [ ] using + 1: indicate actual Flowable and items
    • [ ] zipArray: operator name
    • [ ] zipIterable: operator name
    • [ ] all: should return Single
    • [x] ambWith: indicate cancellation and request propagation
    • [ ] exists: should return Single
    • [x] ~~as: missing diagram~~: operator removed
    • [ ] blockingFirst + 1: missing diagram
    • [ ] blockingForEach: operator name
    • [ ] blockingIterable + 1: operator name, indicate iterator call to hasNext() and next()
    • [ ] blockingLast + 1: operator name
    • [ ] blockingLatest: missing diagram
    • [ ] blockingMostRecent: operator name
    • [ ] blockingNext: operator name
    • [ ] blockingSingle + 1: operator name
    • [x] toFuture: rework diagram to indicate Future
    • [ ] blockingSubscribe + 4: missing diagram
    • [ ] buffer(..., Callable) + 6: indicate custom collection
    • [ ] buffer(open, close) + 1: parameter name in the box
    • [ ] collect: indicate initial container supplier, should return Single
    • [ ] collectInto: indicate initial container, should return Single, operator name
    • [ ] compose: missing diagram
    • [ ] concatMap(prefetch): indicate prefetch effects through requests
    • [ ] concatMapDelayError + 1: missing diagram
    • [ ] concatMapEager + 1: missing diagram
    • [ ] concatMapEagerDelayError + 1: missing diagram
    • [ ] concatMapIterable + 1: missing diagram
    • [ ] concatWith: operator name
    • [ ] contains: should return Single
    • [ ] count: operator name
    • [ ] defaultIfEmpty: indicate non-empty case
    • [ ] delaySubscription(Publisher): missing diagram
    • [ ] dematerialize: indicate dematerializing a terminal notification cancels the source
    • [ ] doFinally: missing diagram
    • [ ] doAfterNext: missing diagram
    • [ ] doOnCancel: operator name, unsubscribe -> cancel in text
    • [ ] doOnComplete: indicate events pass through
    • [ ] doOnEach + 1: indicate events pass through
    • [ ] doOnError: indicate events pass through
    • [ ] doOnNext: indicate events pass through
    • [ ] doOnRequest: missing diagram
    • [ ] doOnTerminate: indicate events pass through, show both error and complete case
    • [ ] elementAt: should return Maybe, indicate if source is shorter than index
    • [ ] elementAtOrDefault: should return Single, indicate if source has that element
    • [ ] elementAtOrError: should return Single, indicate error case
    • [ ] firstElement: image aspect off, add case separator dashed vertical line
    • [ ] first: image aspect off
    • [ ] firstOrError: combine two sections, vertical separator, empty source arrow
    • [ ] flatMap(..., delayError): indicate error delay case
    • [ ] flatMap(..., maxConcurrency) + 2: missing diagram
    • [ ] flatMap(Function, Function, Callable, int): missing diagram
    • [ ] flatMap(Function, BiFunction, boolean, int) + 1: missing diagram
    • [ ] flatMap(Function, BiFunction, int): missing diagram
    • [ ] flatMapCompletable + 1: missing diagram
    • [ ] flatMapIterable + 1: image aspect looks off
    • [ ] flatMapMaybe + 1: missing diagram
    • [ ] flatMapSingle + 1: missing diagram
    • [ ] forEach: missing diagram
    • [ ] forEachWhile + 2: missing diagram
    • [ ] hide: missing diagram
    • [ ] isEmpty: should return Single
    • [ ] lastElement: operator name, should return Maybe
    • [ ] last(T): should return Single, operator name
    • [ ] lastOrError: combine sections into one, add vertical case separator
    • [x] limit: ~~missing diagram~~ operator renamed to take
    • [ ] mergeWith: operator name, indicate the second source as parameter
    • [x] onErrorResumeNext(Function): indicate error function
    • [ ] onErrorReturn: indicate error function
    • [ ] onErrorReturnItem: operator name
    • [ ] onTerminateDetach: missing diagram
    • [ ] publish(Function) + 1: rework diagram
    • [ ] rebatchRequests: missing diagram
    • [ ] reduce(seed, ...): indicate star as parameter
    • [ ] reduceWith(Callable,...): indicate star as a supplied value, operator name
    • [ ] repeat + 1: indicate resubscription
    • [ ] repeatUntil: indicate boolean supplier
    • [ ] replay + 7: remove publish box
    • [ ] replay(Function) + 7: rework diagram
    • [ ] retry(): indicate infinte resubscription
    • [ ] retry(BiPredicate): indicate function decision on retry
    • [ ] retry(long): indicate retry count effect
    • [ ] retry(long, Predicate): indicate predicate effects
    • [ ] retry(Predicate): indicate predicate effects
    • [ ] retryUntil: missing diagram
    • [ ] scan(R, ...): indicate initialValue as parameter
    • [ ] scan(Callable, ...): indicate initialValue is the result of a call
    • [ ] share: indicate the source gets cancelled
    • [ ] singleElement: operator name, should return Maybe
    • [ ] single: operator name, should return Single
    • [ ] singleOrError: combine cases into one box, add vertical case separators
    • [ ] skipUntil: indicate until cancels the main source
    • [ ] sorted + 1: missing diagram
    • [ ] startWith(Iterable): indicate iterable parameter, indicate subscription
    • [ ] startWith(Publisher): indicate subscription after the start element(s)
    • [ ] startWith(T): one start value, indicate subscription after the start element
    • [ ] startWithArray: operator name, indicate subscription after the start element(s)
    • [ ] switchMapDelayError + 1: operator name, indicate error is delayed
    • [ ] takeUntil(Predicate): indicate source is cancelled
    • [ ] takeUntil(Publisher): indicate source is cancelled
    • [ ] takeWhile: indicate source is cancelled
    • [ ] to: missing diagram
    • [ ] toList + 1: should return Single
    • [ ] toList(Callable): should return Single, indicate custom collection
    • [ ] toMap + 2: should return Single
    • [ ] toMultimap + 3: should return Single
    • [ ] toObservable: missing diagram
    • [ ] toSortedList + 4: should return Single
    • [ ] unsubscribeOn: missing diagram
    • [ ] withLatestFrom(s1, s2, ...): missing diagrams
    • [ ] withLatestFrom(Publisher[], ...): missing diagram
    • [ ] withLatestFrom(Iterable, ...): missing diagram
    Documentation PR welcome 3.x 
    opened by akarnokd 5
  • Javadoc mistakes to look for

    Javadoc mistakes to look for

    From this comment:

    • parameter names change in the code but the javadocs still refer to the old names
    • potential exceptions change in the code and the javadocs aren't updated to match
    • missing backpressure / scheduler / since-annotations, and outdated information in each of these
    • changing things from consonant- to vowel-sound-starting, doing a search-and-replace, and not replacing "a" with "an" or vice-versa to match. Also, sometimes changing singulars to plurals or vice-versa by search-and-replace, which causes its own set of grammatical headaches.
    • if you see any documentation that uses the passive voice, you can almost always make it clearer and less prone to misinterpretation by rewriting it in the active voice
    • URLs for marble diagrams are copy-and-pasted, the name of the operator is changed, but the width/height values in the img tag are not, so that the right image appears but in the wrong dimensions (there were a couple of these in the javadocs last I looked)
    Documentation 3.x 
    opened by akarnokd 4
Releases(v3.1.1)
  • v3.1.1(Aug 30, 2021)

    Maven JavaDocs

    API promotions

    • The operator fusion-related interfaces and two atomic queue implementations have been promoted to standard, thus officially supported in the io.reactivex.rxjava3.operators package. (#7320)

    Bugfixes

    • Specify proper OSGi unique bundle symbolic name of io.reactivex.rxjava3.rxjava. (#7319)
    • Fix ExecutorScheduler initializing Schedulers prematurely when using RxJavaPlugins.createExecutorScheduler. (#7323)
    • Fix the LamdbaConsumerIntrospection of Completable's lambda-based observer to use the same missing onError indicator as the other types' lambda-based consumers. (#7326)
    Source code(tar.gz)
    Source code(zip)
  • v3.1.0(Aug 9, 2021)

    Maven JavaDocs

    :warning: With this release, the minimum required Android API level is API 21 (Android 5.0).

    :warning: Note that the 3.0.x patch line won't be developed further.

    API promotions

    • Flowable.onBackpressureReduce() + 1 (#7296)
    • RxJavaPlugins.getOnParallelSubscribe() and RxJavaPlugins.setOnParallelSubscribe() (#7296)
    • TestScheduler([...] boolean useOnScheduleHook) (#7296)

    API additions

    • subscribe([...], DisposableContainer) for better Disposable management and reference cleanup. (#7298)
    • RxJavaPlugins.createExecutorScheduler() for creating an Executor-based Scheduler before the Schedulers class (and thus the standard schedulers) gets initialized. (#7306)

    Behavior changes

    • The scheduler purge thread has been removed. Removing cancelled timed operations is now managed by the setRemoveOnCancelPolicy of the underlying ScheduledExecutorService. (#7293)

    Documentation

    • Fixed wording of the fair parameter of Schedulers.from. (#7301)
    • Update withLatestFrom javadoc about upstream early complete (#7289)

    Other

    • @NonNull annotations on generic type arguments were made consistent across. (#7302, #7303)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.13(Jun 1, 2021)

    Maven JavaDocs

    :warning: RxJava is now signed with a new private key. The new public key fingerprint is 1D9AA7F9E1E2824728B8CD1794B291AEF984A085.

    Documentation

    • Fix wording of *OnSubscribe interfaces (#7274)

    Other

    • Mitigated the security risks caused by the Codecov backdoor (#7237).
    • Improve the build process (#7225, #7253, #7255, #7257, #7258, #7260, #7261, #7262, #7264, #7263)
    • Upgrade to Gradle 7.0 (#7259)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.13-RC5(May 26, 2021)

  • v3.0.13-RC4(May 3, 2021)

  • v3.0.13-RC3(Apr 26, 2021)

  • v3.0.13-RC2(Apr 17, 2021)

  • v3.0.13-RC1(Apr 17, 2021)

  • v3.0.12(Apr 8, 2021)

    Maven JavaDocs

    Bugfix

    • CompositeException.printStackTrace to write directly into PrintStream/PrintWriter. (#7212)

    Documentation

    • Fix wrong reference in Single.flattenStreamAsObservable javadoc. (#7206)
    • Fix style violating Javadoc. (#7210)

    Other

    • Fix POM_URL (#7214)
    • Upgrade Gradle to 6.8.3 (#7208)
    • Bump gradle to 6.8.3 & optimize gradle config (#7207)
    • Added Javadoc checks to Checkstyle. Fix violating Javadoc. (#7210)
    • Modernize gradle plugin block, change maven to maven-publish (#7219)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.12-RC1(Apr 8, 2021)

  • v3.0.11(Mar 6, 2021)

    Maven JavaDocs

    ℹ️ RxJava 2 is now end-of-life (EOL) and no further development or support will be provided by the project.

    Enhancement

    • Add onSubscribe hook to ParallelFlowable operators (#7191)

    Bugfix

    • Allow Single.zip and Maybe.zip result to be garbage collected (#7196)
    • Direct scheduling via Schedulers.from to honor the interruptibleWorker setting (#7203)

    Documentation

    • Fix typos in Schedulers.java (#7178)

    Other

    • Release to Sonatype directly (#7181)
    • Upgrade to Gradle 6.8.2 (#7184)
    • Cleanup of source code headers (#7205)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC5(Feb 16, 2021)

  • v3.0.11-RC4(Feb 13, 2021)

  • v2.2.21(Feb 13, 2021)

    Maven JavaDocs

    :warning: This is the last planned update for the 2.x version line. After February 28, 2021, 2.x becomes End-of-Life (EoL); no further patches, bugfixes, enhancements, documentation or support will be provided by the project.

    Enhancements

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7162)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7170)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC3(Feb 5, 2021)

  • v3.0.11-RC2(Feb 5, 2021)

    Specify the staging profile name to be "io.reactivex" so the close operation finds the repo.

    Unfortunately, there is no other way to test the release process.

    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC1(Feb 5, 2021)

  • v3.0.10(Feb 1, 2021)

    Maven JavaDocs

    Enhancement

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7160)
    • Add TestScheduler option to use onSchedule hook. (#7163)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7169)
    • Add fusion support to concatMap{Maybe|Single|Completable}. (#7165)

    Documentation

    • Update marbles of amb(), ambArray() and ambWith() (#7144)
    • Fix take() mentioning the old limit() operator (#7145)
    • Document Schedulers.from vs. RejectedExecutionException behavior. (#7150)
    • Update documentation for NewThreadWorker.scheduleActual method. (#7164)
    • Improve Javadocs style of Schedulers. (#7168)

    Other

    • onReduceBackpressure internals cleanup (#7151)
    • Workaround for FutureTask.toString recursion on JDK 10+. (#7173)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.9(Dec 30, 2020)

  • v3.0.8(Dec 2, 2020)

  • v3.0.8-RC3(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC2(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC1(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.7(Oct 7, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Observable.toFlowable(ERROR) not cancelling on MissingBackpressureException. (#7083)
    • Fix Flowable.concatMap backpressure with scalars. (#7089)

    Documentation

    • fromRunnable/fromAction javadoc improvements. (#7071)
    • Patch out duplicate @NonNull annotation in generated javadocs. (#7073)
    • Clarify the documentation for scan operators. (#7093)
    Source code(tar.gz)
    Source code(zip)
  • v2.2.20(Oct 6, 2020)

    Maven JavaDocs

    :warning: The 2.x version line is now in maintenance mode and will be supported only through bugfixes until February 28, 2021. No new features, behavior changes or documentation adjustments will be accepted or applied to 2.x. It is recommended to migrate to 3.x within this time period.

    Bugfixes

    • Fix Observable.flatMap with maxConcurrency hangs (#6960)
    • Fix Observable.toFlowable(ERROR) not cancelling upon MissingBackpressureException (#7084)
    • Fix Flowable.concatMap backpressure with scalars. (#7091)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.6(Aug 20, 2020)

  • v3.0.5(Jul 31, 2020)

  • v3.0.4(May 21, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Flowable.groupBy eviction logic double decrement and hang. (#6975)
    • Fix Flowable.groupBy cancellation/cleanup/eviction race hangs. (#6979)
    • Disable fusion on the groups of Flowable.groupBy. (#6983)
    • Fix Flowable.groupBy eviction-completion-replenishment problems. (#6988)
    • Removed unnecessary upstream.cancel() call for casually finished upstream sequences. (#6992)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.3(May 1, 2020)

    Maven JavaDocs

    Enhancements

    • Allow setting the drift tolerance timeunit via system property rx3.scheduler.drift-tolerance-unit. (#6969)

    Bugfixes

    • Fix scheduled tasks' fatal exception behavior. (#6956)

    Documentation

    • Quick Javadoc fixes. (#6943)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.2(Apr 6, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Observable.flatMap with maxConcurrency hangs. (#6946)

    Documentation

    • Add see annotation for range operators. (#6934)
    • Update images and their JavaDocs URLs to non-transparent version. (#6944)
    Source code(tar.gz)
    Source code(zip)
Owner
ReactiveX
Reactive Extensions for Async Programming
ReactiveX
Reactive Streams Utilities - Future standard utilities library for Reactive Streams.

Reactive Streams Utilities This is an exploration of what a utilities library for Reactive Streams in the JDK might look like. Glossary: A short gloss

Lightbend 61 May 27, 2021
Implementation of various string similarity and distance algorithms: Levenshtein, Jaro-winkler, n-Gram, Q-Gram, Jaccard index, Longest Common Subsequence edit distance, cosine similarity ...

java-string-similarity A library implementing different string similarity and distance measures. A dozen of algorithms (including Levenshtein edit dis

Thibault Debatty 2.4k Sep 18, 2021
Reactive Programming for Android

Reactive Programming for Android Agera is a set of classes and interfaces to help write functional, asynchronous, and reactive applications for Androi

Google 7.3k Sep 15, 2021
An advanced, but easy to use, platform for writing functional applications in Java 8.

Getting Cyclops X (10) The latest version is cyclops:10.4.0 Stackoverflow tag cyclops-react Documentation (work in progress for Cyclops X) Integration

AOL 1.2k Sep 15, 2021
Micro second messaging that stores everything to disk

Chronicle Queue Contents Table of Contents Contents About Chronicle Software What is Chronicle Queue Java Docs Usage More benchmarks Downloading Chron

Chronicle Software : Open Source 2.4k Sep 15, 2021
Library for creating In-memory circular buffers that use direct ByteBuffers to minimize GC overhead

Overview This project aims at creating a simple efficient building block for "Big Data" libraries, applications and frameworks; thing that can be used

Tatu Saloranta 130 Sep 11, 2021
🔥Android懒人框架,基于谷歌最新AAC架构,MVVM设计模式,组件化开发的一套快速开发库,整合Okhttp+RxJava+Retrofit+Glide等主流模块,满足日常开发需求。使用该框架可以快速开发一个高质量、易维护的Android应用

MvvmLazy Android懒人框架 目前,android流行的MVC、MVP模式的开发框架很多,然而一款基于MVVM模式开发框架却很少。 个人搜寻了市面上大量的开源框架,秉承减少重复造轮子的原则,汲取了各位大神的框架优点,集成了大量常用的开源框架和工具类,进行了部分公用模块封装,丰富了Bind

rui 15 Sep 10, 2021
A Java implementation of Transducers

transducers-java Transducers are composable algorithmic transformations. They are independent from the context of their input and output sources and s

null 113 Oct 25, 2020
Zero-dependency Reactive Streams publishers library

⚡️ Mutiny Zero: a zero-dependency Reactive Streams publishers library for Java Mutiny Zero is a minimal API for creating reactive-streams compliant pu

SmallRye 10 Aug 27, 2021
Zero-allocation hashing for Java

Zero-Allocation Hashing Version Overview This project provides a Java API for hashing any sequence of bytes in Java, including all kinds of primitive

Chronicle Software : Open Source 603 Sep 9, 2021
Lightweight threads for Java, with message passing, nio, http and scheduling support.

Kilim: Continuations, Fibers, Actors and message passing for the JVM

Sriram Srinivasan 1.6k Sep 14, 2021
Comparison between Java and Common Lisp solutions to a phone-encoding problem described by Prechelt

Prechelt Phone Number Encoding This project implements the phone number encoding described by Lutz Prechelt in his article for the COMMUNICATIONS OF T

Renato Athaydes 15 Aug 17, 2021
LMDB for Java

LMDB JNI LMDB JNI provide a Java API to LMDB which is an ultra-fast, ultra-compact key-value embedded data store developed by Symas for the OpenLDAP P

deephacks 199 Aug 13, 2021
A high performance caching library for Java

Caffeine is a high performance, near optimal caching library. For more details, see our user's guide and browse the API docs for the latest release. C

Ben Manes 10.4k Sep 11, 2021
RTree2D is a 2D immutable R-tree with STR (Sort-Tile-Recursive) packing for ultra-fast nearest and intersection queries

RTree2D RTree2D is a 2D immutable R-tree with STR (Sort-Tile-Recursive) packing for ultra-fast nearest and intersection queries. Goals Main our requir

Andriy Plokhotnyuk 101 Sep 1, 2021
gRPC and protocol buffers for Android, Kotlin, and Java.

Wire “A man got to have a code!” - Omar Little See the project website for documentation and APIs. As our teams and programs grow, the variety and vol

Square 3.6k Sep 16, 2021
Java large off heap cache

OHC - An off-heap-cache Features asynchronous cache loader support optional per entry or default TTL/expireAt entry eviction and expiration without a

Robert Stupp 617 Sep 10, 2021
LWJGL is a Java library that enables cross-platform access to popular native APIs useful in the development of graphics (OpenGL, Vulkan), audio (OpenAL), parallel computing (OpenCL, CUDA) and XR (OpenVR, LibOVR) applications.

LWJGL - Lightweight Java Game Library 3 LWJGL (https://www.lwjgl.org) is a Java library that enables cross-platform access to popular native APIs usef

Lightweight Java Game Library 3.4k Sep 9, 2021
Rolling hash functions in Java

Rolling hash functions in Java License: Apache 2.0 What is this? This is a set of Java classes implementing various recursive n-gram hashing technique

Daniel Lemire 71 Jul 25, 2021