RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Overview

RxJava: Reactive Extensions for the JVM

codecov.io Maven Central

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Version 3.x (Javadoc)

  • single dependency: Reactive-Streams
  • Java 8+ (Android desugar friendly)
  • Java 8 lambda-friendly API
  • fixed API mistakes and many limits of RxJava 2
  • intended to be a replacement for RxJava 2 with relatively few binary incompatible changes
  • non-opinionated about the source of concurrency (threads, pools, event loops, fibers, actors, etc.)
  • async or synchronous execution
  • virtual time and schedulers for parameterized concurrency
  • test and diagnostic support via test schedulers, test consumers and plugin hooks

Learn more about RxJava in general on the Wiki Home.

ℹ️ Please read the What's different in 3.0 for details on the changes and migration information when upgrading from 2.x.

Version 2.x

The 2.x version is end-of-life as of February 28, 2021. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 2.2.21, will remain accessible.

Version 1.x

The 1.x version is end-of-life as of March 31, 2018. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 1.3.8, will remain accessible.

Getting started

Setting up the dependency

The first step is to include RxJava 3 into your project, for example, as a Gradle compile dependency:

implementation "io.reactivex.rxjava3:rxjava:3.x.y"

(Please replace x and y with the latest version numbers: Maven Central )

Hello World

The second is to write the Hello World program:

package rxjava.examples;

import io.reactivex.rxjava3.core.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

Note that RxJava 3 components now live under io.reactivex.rxjava3 and the base classes and interfaces live under io.reactivex.rxjava3.core.

Base classes

RxJava 3 features several base classes you can discover operators on:

Some terminology

Upstream, downstream

The dataflows in RxJava consist of a source, zero or more intermediate steps followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

Here, if we imagine ourselves on operator2, looking to the left towards the source is called the upstream. Looking to the right towards the subscriber/consumer is called the downstream. This is often more apparent when each element is written on a separate line:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

Objects in motion

In RxJava's documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow.

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated to the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

Assembly time

The preparation of dataflows by applying various intermediate operators happens in the so-called assembly time:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

At this point, the data is not flowing yet and no side-effects are happening.

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)

This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.

Runtime

This is the state when the flows are actively emitting items, errors or completion signals:

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Simple background computation

One of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread:

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Typically, you can move computations or blocking IO to some other thread via subscribeOn. Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.

Schedulers

RxJava operators don't work with Threads or ExecutorServices directly but with so-called Schedulers that abstract away sources of concurrency behind a uniform API. RxJava 3 features several standard schedulers accessible via Schedulers utility class.

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.

These are available on all JVM platforms but some specific platforms, such as Android, have their own typical Schedulers defined: AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

In addition, there is an option to wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler via Schedulers.from(Executor). This can be used, for example, to have a larger but still fixed pool of threads (unlike computation() and io() respectively).

The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.

Concurrency within a flow

Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the "main" thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.

Parallel processing

Processing the numbers 1 to 10 in parallel is a bit more involved:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.

Note, however, that flatMap doesn't guarantee any order and the items from the inner flows may end up interleaved. There are alternative operators:

  • concatMap that maps and runs one inner flow at a time and
  • concatMapEager which runs all inner flows "at once" but the output flow will be in the order those inner flows were created.

Alternatively, the Flowable.parallel() operator and the ParallelFlowable type help achieve the same parallel processing pattern:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.

Dependent

The most typical scenario is to given a value, invoke another service, await and continue with its result:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap into the inner parts of the previous flatMap for example:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

Here, the original value will be available inside the inner flatMap, courtesy of lambda variable capture.

Non-dependent

In other scenarios, the result(s) of the first source/dataflow is irrelevant and one would like to continue with a quasi independent another source. Here, flatMap works as well:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

however, the continuation in this case stays Observable instead of the likely more appropriate Single. (This is understandable because from the perspective of flatMapSingle, sourceObservable is a multi-valued source and thus the mapping may result in multiple values as well).

Often though there is a way that is somewhat more expressive (and also lower overhead) by using Completable as the mediator and its operator andThen to resume with something else:

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

The only dependency between the sourceObservable and the someSingleSource is that the former should complete normally in order for the latter to be consumed.

Deferred-dependent

Sometimes, there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, was not flowing through the "regular channels". One would be inclined to write such continuations as follows:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

Unfortunately, this prints 0 because Single.just(count.get()) is evaluated at assembly time when the dataflow hasn't even run yet. We need something that defers the evaluation of this Single source until runtime when the main source completes:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

or

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversions

Sometimes, a source or service returns a different type than the flow that is supposed to work with it. For example, in the inventory example above, getDemandAsync could return a Single<DemandRecord>. If the code example is left unchanged, this will result in a compile-time error (however, often with a misleading error message about lack of overload).

In such situations, there are usually two options to fix the transformation: 1) convert to the desired type or 2) find and use an overload of the specific operator supporting the different type.

Converting to the desired type

Each reactive base class features operators that can perform such conversions, including the protocol conversions, to match some other type. The following matrix shows the available conversion options:

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Observable toFlowable2 first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Single toFlowable3 toObservable toMaybe ignoreElement
Maybe toFlowable3 toObservable toSingle ignoreElement
Completable toFlowable toObservable toSingle toMaybe

1: When turning a multi-valued source into a single-valued source, one should decide which of the many source values should be considered as the result.

2: Turning an Observable into Flowable requires an additional decision: what to do with the potential unconstrained flow of the source Observable? There are several strategies available (such as buffering, dropping, keeping the latest) via the BackpressureStrategy parameter or via standard Flowable operators such as onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest which also allow further customization of the backpressure behavior.

3: When there is only (at most) one source item, there is no problem with backpressure as it can be always stored until the downstream is ready to consume.

Using an overload with the desired type

Many frequently used operator has overloads that can deal with the other types. These are usually named with the suffix of the target type:

Operator Overloads
flatMap flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMap concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMap switchMapSingle, switchMapMaybe, switchMapCompletable

The reason these operators have a suffix instead of simply having the same name with different signature is type erasure. Java doesn't consider signatures such as operator(Function<T, Single<R>>) and operator(Function<T, Maybe<R>>) different (unlike C#) and due to erasure, the two operators would end up as duplicate methods with the same signature.

Operator naming conventions

Naming in programming is one of the hardest things as names are expected to be not long, expressive, capturing and easily memorable. Unfortunately, the target language (and pre-existing conventions) may not give too much help in this regard (unusable keywords, type erasure, type ambiguities, etc.).

Unusable keywords

In the original Rx.NET, the operator that emits a single item and then completes is called Return(T). Since the Java convention is to have a lowercase letter start a method name, this would have been return(T) which is a keyword in Java and thus not available. Therefore, RxJava chose to name this operator just(T). The same limitation exists for the operator Switch, which had to be named switchOnNext. Yet another example is Catch which was named onErrorResumeNext.

Type erasure

Many operators that expect the user to provide some function returning a reactive type can't be overloaded because the type erasure around a Function<T, X> turns such method signatures into duplicates. RxJava chose to name such operators by appending the type as suffix as well:

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

Even though certain operators have no problems from type erasure, their signature may turn up being ambiguous, especially if one uses Java 8 and lambdas. For example, there are several overloads of concatWith taking the various other reactive base types as arguments (for providing convenience and performance benefits in the underlying implementation):

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

Both Publisher and SingleSource appear as functional interfaces (types with one abstract method) and may encourage users to try to provide a lambda expression:

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

Unfortunately, this approach doesn't work and the example does not print 2 at all. In fact, since version 2.1.10, it doesn't even compile because at least 4 concatWith overloads exist and the compiler finds the code above ambiguous.

The user in such situations probably wanted to defer some computation until the someSource has completed, thus the correct unambiguous operator should have been defer:

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

Sometimes, a suffix is added to avoid logical ambiguities that may compile but produce the wrong type in a flow:

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

This can get also ambiguous when functional interface types get involved as the type argument T.

Error handling

Dataflows can fail, at which point the error is emitted to the consumer(s). Sometimes though, multiple sources may fail at which point there is a choice whether or not wait for all of them to complete or fail. To indicate this opportunity, many operator names are suffixed with the DelayError words (while others feature a delayError or delayErrors boolean flag in one of their overloads):

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

Of course, suffixes of various kinds may appear together:

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

The base classes can be considered heavy due to the sheer number of static and instance methods on them. RxJava 3's design was heavily influenced by the Reactive Streams specification, therefore, the library features a class and an interface per each reactive type:

Type Class Interface Consumer
0..N backpressured Flowable Publisher1 Subscriber
0..N unbounded Observable ObservableSource2 Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

1The org.reactivestreams.Publisher is part of the external Reactive Streams library. It is the main type to interact with other reactive libraries through a standardized mechanism governed by the Reactive Streams specification.

2The naming convention of the interface was to append Source to the semi-traditional class name. There is no FlowableSource since Publisher is provided by the Reactive Streams library (and subtyping it wouldn't have helped with interoperation either). These interfaces are, however, not standard in the sense of the Reactive Streams specification and are currently RxJava specific only.

R8 and ProGuard settings

By default, RxJava itself doesn't require any ProGuard/R8 settings and should work without problems. Unfortunately, the Reactive Streams dependency since version 1.0.3 has embedded Java 9 class files in its JAR that can cause warnings with the plain ProGuard:

Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher

It is recommended one sets up the following -dontwarn entry in the application's proguard-ruleset file:

-dontwarn java.util.concurrent.Flow*

For R8, the RxJava jar includes the META-INF/proguard/rxjava3.pro with the same no-warning clause and should apply automatically.

Further reading

For further details, consult the wiki.

Communication

Versioning

Version 3.x is in development. Bugfixes will be applied to both 2.x and 3.x branches, but new features will only be added to 3.x.

Minor 3.x increments (such as 3.1, 3.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 3.x.y increments (such as 3.0.0 -> 3.0.1, 3.3.1 -> 3.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload). New functionality marked with an @Beta or @Experimental annotation can also be added in the patch releases to allow rapid exploration and iteration of unstable new functionality.

@Beta

APIs marked with the @Beta annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

@Experimental

APIs marked with the @Experimental annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

@Deprecated

APIs marked with the @Deprecated annotation at the class or method level will remain supported until the next major release but it is recommended to stop using them.

io.reactivex.rxjava3.internal.*

All code inside the io.reactivex.rxjava3.internal.* packages are considered private API and should not be relied upon at all. It can change at any time.

Full Documentation

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

Example for Gradle:

implementation 'io.reactivex.rxjava3:rxjava:x.y.z'

and for Maven:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

and for Ivy:

<dependency org="io.reactivex.rxjava3" name="rxjava" rev="x.y.z" />

Snapshots

Snapshots are available via https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava3/rxjava/

repositories {
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}

JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot

Build

To build:

$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build

Further details on building can be found on the Getting Started page of the wiki.

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright (c) 2016-present, RxJava Contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Issues
  • Idiomatic Scala Support

    Idiomatic Scala Support

    As of version 0.11.0 Scala support is provided through the use of implicits. Conversations on Twitter are bringing up other possible improvements. Let's use this issue to discuss.

    opened by benjchristensen 96
  • 2.x: Handling null values

    2.x: Handling null values

    With the upcoming RxJava2 release one of the important changes is that null is no longer accepted as a stream element.

    Honestly, I have mixed feelings about this change and part of me understands that it will enforce clean APIs, but I can see a number of use cases when this might be a problem.

    For instance, in my app I have an in-memory cache:

    @Nullable CacheItem findCacheItem(long id);
    

    CacheItem might not be present in cache, so method might return null value.

    The way it is used with Rx* - is as following:

    Observable<CacheItem> getStream(final long id) {
        return Observable.fromCallable(new Callable<CacheItem>() {
            @Override public CacheItem call() throws Exception {
                return findCacheItem(id);
            }
        });
    }
    

    So with this approach, I might get null in my stream which is totally valid situation, so it is handled properly on receiving side - let's say UI changes its state if item is not present in cache:

    Observable.just(user)
              .map(user -> user.getName())
              .map(name -> convertNameToId(name))
              .flatMap(id -> getStream(id))
              .map(cacheItem -> getUserInfoFromCacheItem(cacheItem))
              .subscribe(
                  userInfo -> {
                      if(userInfo != null) showUserInfo();
                      else showPrompt();
                  }
              );
    

    With RxJava2 I am no longer allowed to post null down the stream, so I either need to wrap my CacheItem into some other class and make my stream produce that wrapper instead or make quite big architectural changes.

    Wrapping every single stream element into nullable counterpart doesn't look right to me.

    Am I missing something fundamental here?

    It seems like the situation like mine is quite popular, so Im curious what is the recommended strategy to tackle this problem given new "no null" policy in RxJava2?

    Question 
    opened by paveldudka 89
  • OnSubscribeRedo - fix race conditions

    OnSubscribeRedo - fix race conditions

    While searching for the cause of #2863 I bumped into this race condition (which doesn't fix #2863):

    If a request is made between L238 and L239 then consumerCapacity may become Long.MAX_VALUE on arriving at L239 in which case we don't wish to decrement it. To fix, used compareAndSet.

    What is interesting about this fix is that in the test loop of 5000 in OperatorRetryTest I see many more occurrences of the failure on average (3 -> 50) presumably because the extra time to perform the compareAndSet action has expanded the window for the race condition causing the failures.

    Bug 
    opened by davidmoten 83
  • Observer reference is not released on unsubscribe

    Observer reference is not released on unsubscribe

    With the following code http://pastebin.com/VUZ5aApe and using the latest version of rxjava (0.18.4),

    rotations should result in the activity calling unsubscribe on my subscription.

    This should release the reference to my observer, which should in turn release the reference to my activity.

    However, the memory tests I've done show that as I rotate the phone, my activity count keeps going up, which means the reference to my activity is not being released.

    (let me know if the pastebin expires for some reason, I'll get something back up)

    opened by fdoyle 74
  • Roadmap to 1.0

    Roadmap to 1.0

    I want everyone to know where we're heading and what's left before we hit 1.0.

    From the beginning we have allowed ourselves to do breaking changes on each 0.x release as we were aware that the design was not finished when we started this project. We are getting close to being done. Our goal is to release 1.0 in the coming months after stabilizing the API so that it can be relied upon without breaking every couple months.

    Project Structure

    When we hit 1.0 we intend on splitting out the language adaptors into their own top-level projects such as RxScala, RxClojure, RxGroovy, RxKotlin, RxJRuby etc.

    This will allow each project to iterate as needed at their own pace, especially since some will need to continue iterating while the RxJava core stabilizes. For example, if RxScala needs breaking changes it can bump it's major version while RxJava does not. This is particularly important to RxScala for handling changes such as Scala 2.10 -> 2.11 -> 2.12 etc.

    Major contrib modules will also be moved out, such as RxAndroid which also needs its own life-cycle.

    Outstanding Work

    The major items of work to be finished before 1.0 are:

    • ~~Backpressure: https://github.com/Netflix/RxJava/issues/1000~~ [Completed]
    • ~~Serialization Behavior: https://github.com/Netflix/RxJava/issues/998~~ [Completed in new merge implementation]
    • ~~Scheduler API: https://github.com/Netflix/RxJava/issues/997~~ Completed
    • ~~Remove all deprecated methods/classes~~ [Completed] #1053 #1621
    • ~~Finish migrating all operators to using lift and chaining the Subscription via Subscriber (current priority)~~ [Completed]

    The primary goal is to nail down the public API. New functionality can come in 1.1, 1.2, etc. The secondary goal is for all operators to work as advertised (regarding unsubscribe, back pressure and non-blocking). There will always be bugs, that's why 1.x.y will still be active after release, but the desire is to not need to ship 2.x soon after 1.x as this is a low level library that once entrenched becomes hard to migrate (we create significant pain at Netflix on each 0.x release).

    Going Forward

    Please comment if you feel there are other critical things to achieve before 1.0. The fastest way to getting us to 1.0 is helping us achieve the work stated above.

    Information 
    opened by benjchristensen 59
  • Version 0.17.0 Release Notes [Preview]

    Version 0.17.0 Release Notes [Preview]

    #0.17.0 Release Notes

    Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.

    The new signatures related to Observable in this release are:

    // A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
    public final static <T> Observable<T> create(OnSubscribe<T> f)
    
    // The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
    public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
    
    // Subscriber is an Observer + Subscription
    public abstract class Subscriber<T> implements Observer<T>, Subscription
    
    // The main `subscribe` behavior receives a Subscriber instead of Observer
    public final Subscription subscribe(Subscriber<? super T> observer)
    
    // Subscribing with an Observer however is still appropriate
    // and the Observer is automatically converted into a Subscriber
    public final Subscription subscribe(Observer<? super T> observer)
    
    // A new 'lift' function allows composing Operator implementations together
    public <R> Observable<R> lift(Func1<Subscriber<? super R>, Subscriber<? super T>> lift)
    
    

    Also changed is the Scheduler interface which is much simpler:

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.

    As shown in the code above the changes fall into 2 major sections:

    1) Lift/OnSubscribe/Subscriber

    Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.

    2) Schedulers

    Simplification of the Scheduler interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.

    Lift/OnSubscribe/Subscriber

    New types Subscriber and OnSubscribe along with the new lift operator have been added. The reasons and benefits are as follows:

    1) Synchronous Unsubscribe

    RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Observer<? super Integer> Observer) {
            for (int i = 1; i < 1000000; i++) {
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    });
    

    Subscribing to this Observable will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10).

    Version 0.17.0 fixes this issue by injecting the Subscription into the OnSubscribe function to allow code like this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            // we now receive a Subscriber instead of Observer
            for (int i = 1; i < 1000000; i++) {
                // the OnSubscribe can now check for isUnsubscribed
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    
    });
    

    Subscribing to this will now correctly only emit 10 onNext and unsubscribe:

    // subscribe with an Observer
    oi.take(10).subscribe(new Observer<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
        }
    
    })
    

    Or the new Subscriber type can be used and the Subscriber itself can unsubscribe:

    // or subscribe with a Subscriber which supports unsubscribe
    oi.subscribe(new Subscriber<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
            if(t >= 10) {
                // a Subscriber can unsubscribe
                this.unsubscribe();
            }
        }
    
    })
    

    2) Custom Operator Chaining

    Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable is using static methods. This has meant code like this:

    MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()
    

    In reality we want:

    observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()
    

    Using the newly added lift we can get quite close to this:

    observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()
    

    Here is how the proposed lift method looks if all operators were applied with it:

    Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);
    

    Along with the lift function comes a new Operator signature:

    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
    

    All operator implementations in the rx.operators package will over time be migrated to this new signature.

    3) Simpler Operator Implementations

    The lift operator injects the necessary Observer and Subscription instances (via the new Subscriber type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription is available in-scope there are no awkward coding patterns needed for creating a Subscription, closing over it and returning and taking into account synchronous vs asynchronous.

    For example, the body of fromIterable is simply:

    public void call(Subscriber<? super T> o) {
        for (T i : is) {
            if (o.isUnsubscribed()) {
                return;
            }
            o.onNext(i);
        }
        o.onCompleted();
    }
    

    The take operator is:

    public final class OperatorTake<T> implements Operator<T, T> {
    
        final int limit;
    
        public OperatorTake(int limit) {
            this.limit = limit;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> o) {
            CompositeSubscription parent = new CompositeSubscription();
            if (limit == 0) {
                o.onCompleted();
                parent.unsubscribe();
            }
            return new Subscriber<T>(parent) {
    
                int count = 0;
                boolean completed = false;
    
                @Override
                public void onCompleted() {
                    if (!completed) {
                        o.onCompleted();
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    if (!completed) {
                        o.onError(e);
                    }
                }
    
                @Override
                public void onNext(T i) {
                    if (!isUnsubscribed()) {
                        o.onNext(i);
                        if (++count >= limit) {
                            completed = true;
                            o.onCompleted();
                            unsubscribe();
                        }
                    }
                }
    
            };
        }
    
    }
    

    4) Recursion/Loop Performance

    The fromIterable use case is 20x faster when implemented as a loop instead of recursive scheduler (see https://github.com/Netflix/RxJava/commit/a18b8c1a572b7b9509b7a7fe1a5075ce93657771).

    Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.

    Schedulers

    Schedulers were greatly simplified to a design based around Action1<Inner>.

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This design change originated from three findings:

    1. It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.

    To solve this the new design explicitly has the outer Scheduler and then Scheduler.Inner for recursion.

    1. The passing of state is not useful since scheduling over network boundaries with this model does not work.

    In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler that attempted to use observeOn to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.

    1. The number of overloads with different ways of doing the same things were confusing.

    This new design removes all but the essential and simplest methods.

    1. A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.

    This new design applies similar principles as done with lift/create/OnSubscribe/Subscriber and injects the Subscription via the Inner interface so a running task can check isUnsubscribed().

    WIth this new design, the simplest execution of a single task is:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
        }
    
    });
    

    Recursion is easily invoked:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
            inner.schedule(this);
        }
    
    });
    

    The use of Action1<Inner> on both the outer and inner levels makes it so recursion that refer to this and it works easily.

    Similar to the new lift/create pattern with Subscriber the Inner is also a Subscription so it allows efficient loops with unsubscribe support:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    

    An action can now unsubscribe the Scheduler.Inner:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                int i = doOtherWork();
                if(i > 100) {
                    // an Action can cause the Scheduler to unsubscribe and stop
                    inner.unsubscribe();
                }
            }
        }
    
    });
    

    Typically just stopping is sufficient:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            }
        }
    
    });
    

    but if other work in other tasks is being done and you want to unsubscribe conditionally you could:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            } else {
                inner.unsubscribe();
            }
        }
    
    });
    

    and the recursion can be delayed:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed ... but delay the recursion
            inner.schedule(this, 500, TimeUnit.MILLISECONDS);
        }
    
    });
    

    The methods on the Inner never return a Subscription because they are always a single thread/event-loop/actor/etc and controlled by the Subscription returned by the initial Scheduler.schedule method. This is part of clarifying the contract.

    Thus an unsubscribe controlled from the outside would be done like this:

    Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    
    // unsubscribe from outside
    s.unsubscribe();
    

    Migration Path

    1) Lift/OnSubscribe/Subscriber

    The lift function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber type is also additive and for most use cases does not need to be used directly, the Observer interface can continue being used.

    The previous create(OnSubscribeFunc f) signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.

    Code such as this:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(Observer<? super Integer> o) {
            o.onNext(1);
            o.onNext(2);
            o.onCompleted();
            return Subscriptions.empty();
        }
    });
    

    should change to this:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onCompleted();
        }
    });
    

    If concurrency was being injected:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(final Observer<? super Integer> o) {
            final BooleanSubscription s = new BooleanSubscription();
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (s.isUnsubscribed()) {
                        o.onNext(i++);
                    }
                }
    
            });
            t.start();
            return s;
        }
    });
    

    you may no longer need it and can implement like this instead:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    });
    

    or can just simplify the Subscription management:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
            t.start();
        }
    });
    

    or use a Scheduler:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Schedulers.io().schedule(new Action1<Inner>() {
    
                @Override
                public void call(Inner inner) {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
        }
    });
    

    or use subscribeOn which now works to make synchronous Observables async while supporting unsubscribe (this didn't work before):

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    }).subscribeOn(Schedulers.newThread());
    

    2) Schedulers

    Custom Scheduler implementations will need to be re-implemented and any direct use of the Scheduler interface will also need to be updated.

    3) Subscription

    If you have custom Subscription implementations you will see they now need an isUnsubscribed() method.

    You can either add this method, or wrap your function using Subscriptions.create and it will handle the isUnsubscribed behavior and execute your function when unsubscribe() is called.

    The Future...

    This is hopefully the last of the major refactors to rxjava-core and we're approaching version 1.0. We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create/subscribe signatures as we want and the Subscription and Scheduler interfaces are now clean.

    We need to improve on some of the Subject implementations still, particularly ReplaySubject. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.

    We appreciate your usage, feedback and contributions and hope the library is creating value for you!

    opened by benjchristensen 57
  • Proposed Scheduler Interface Change for 0.18 (yes, again)

    Proposed Scheduler Interface Change for 0.18 (yes, again)

    Reviewing the Scheduler interface changes of 0.17 with @headinthebox revealed that we're not 100% happy with the outcome, particularly after learning that Java 8 does not allow referencing this from within a lambda.

    The Scheduler interface as of 0.17 is:

    class Scheduler {
        public abstract Subscription schedule(Action1<Scheduler.Inner> action);
        public abstract Subscription schedule(Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit);
        public Subscription scheduleRecursive(Action1<Recurse> action);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public int degreeOfParallelism();
        public long now();
    
        public static final class Recurse {
            public void schedule() {
            public void schedule(long delay, TimeUnit unit) {
        }
    
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    We have determined two problems with this:

    1. Inner/Outer Dance

    In practice we have found that usage is always one of two things, either you just interact with the outer and don't care about the Inner, or you immediately need the Inner and have to do an awkward first scheduling just to get access to the Inner. (See here and weep.)

    1. Recursion

    The Action1<Scheduler.Inner> signature was chosen and put on both outer and inner so that an inner class could refer to itself using this to simply reschedule itself from the outer onto the inner.

    It was assumed this would work in Java 8 lambdas but unfortunately we did not prove it.

    This works with anonymous classes:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            System.out.println("do stuff");
            // recurse
            inner.schedule(this);
        }
    
    });
    

    but this does not with lambdas:

    Schedulers.newThread().schedule((inner) -> {
        System.out.println("do stuff");
        inner.schedule(this); // doesn't compile
    });
    

    So we end up with this:

    Schedulers.newThread().scheduleRecursive((recurse) -> {
        System.out.println("do stuff");
        recurse.schedule();
    });
    

    At that point it's clear that Inner is not working well and we have Recurse to fix the problem.

    Thus, the proposed changes (breaking again) are:

    class Scheduler {
        public final Subscription schedule(Action1<Recurse> action);
        public final Subscription schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        public final Subscription schedulePeriodically(Action1<Recurse> action, long initialDelay, long period, TimeUnit unit);
        public abstract Inner createInner(); // for advanced use cases like `observeOn`
        public int degreeOfParallelism();
        public long now();
    
        // now the primary interface
        public static final class Recurse {
            public final void schedule();
            public final void schedule(long delay, TimeUnit unit);
            public final void schedule(Action1<Recurse> action);
            public final void schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        }
    
        // now mostly an implementation detail except for advanced use cases
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Recurse> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Recurse> action);
            public long now();
        }
    }
    

    The name of Recurse is up for debate. It may be possible to merge Recurse and Inner but I haven't figured it out yet. The reason is that Inner is a single instance representing a thread or event-loop whereas Recurse represents an Action or work. Thus a given Inner could have multiple Recurse actions scheduled on to it. It is being an Action that allows it to recurse by invoking schedule() that just reschedules itself.

    This would make it better support Java 8 lambdas and simply recursion, while also better supporting (via the createInner() method) the more complicated use cases like observeOn where current code is very awkward.

    This needs to be the last refactor of this so we nail it down and stop breaking things and can get to 1.0.

    Let the discussion begin ...

    opened by benjchristensen 55
  • Profiling Memory Usage and Object Creation

    Profiling Memory Usage and Object Creation

    We need to spend time profiling memory and object allocation and finding places where we can improve.

    I would really appreciate help diving into this and finding problem areas. Even if you don't fix them, but just identity use cases, operators, etc that would be very valuable.

    This is partly a result of the fact that in Netflix production we have seen an increase in YoungGen GCs since 0.17.x.

    The areas to start should probably be:

    • Observable.create
    • Observable.lift
    • Subscriber
    • CompositeSubscription
    • map
    • flatMap

    If you can or want to get involved in this please comment here so we all can collaborate together.

    opened by benjchristensen 54
  • Experimental Proposal of rx.Task

    Experimental Proposal of rx.Task

    Adds rx.Task as a "scalar Observable" for representing work with a single return value.

    See https://github.com/ReactiveX/RxJava/issues/1594 rx.Future/Task

    This provides a type similar to Future in that it represents a scalar unit of work, but it is lazy like an Observable and many Tasks can be combined into an Observable stream. Note how Task.zip returns Task<R> whereas Task.merge returns Observable<R>.

    NOTE: This is for experimentation and feedback at this time.

    Items requiring review and work that I'm particularly aware of:

    • naming of OnExecute
    • naming of TaskObserver (this one in particular I don't like)
    • design and implementation of Task.Promise
    • should the public lift use the Observable.Operator or should that only be for internal reuse?
    • should we have a public lift that uses a Task.Operator?
    • the Task.toObservable implementation right now is efficient but will likely break something so it likely needs to change to use subscribe
    • implementation of this merge variant: Task<T> merge(final Task<? extends Task<? extends T>> source)
    • several operators currently just wrap as Observable to reuse existing operators ... is that okay performance wise?
    • Javadocs

    Examples of using this class:

    import rx.Observable;
    import rx.Task;
    import rx.Task.Promise;
    
    public class TaskExamples {
    
        public static void main(String... args) {
            // scalar synchronous value
            Task<String> t1 = Task.create(t -> {
                t.onSuccess("Hello World!");
            });
    
            // scalar synchronous value using helper method
            Task<Integer> t2 = Task.just(1);
    
            // synchronous error
            Task<String> error = Task.create(t -> {
                t.onError(new RuntimeException("failed!"));
            });
    
            // executing
            t1.subscribe(System.out::println);
            t2.subscribe(System.out::println);
            error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));
    
            // scalar Tasks for request/response like a Future
            getData(1).subscribe(System.out::println);
            getDataUsingPromise(2).subscribe(System.out::println);
    
            // combining Tasks into another Task
            Task<String> zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b);
    
            // combining Tasks into an Observable stream
            Observable<String> merged = Task.merge(t1, t2.map(String::valueOf), getData(3));
            Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));
    
            zipped.subscribe(v -> System.out.println("zipped => " + v));
            merged.subscribe(v -> System.out.println("merged => " + v));
            mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
        }
    
        /**
         * Example of an async scalar execution using Task.create
         * <p>
         * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
         *
         * @param arg
         * @return
         */
        public static Task<String> getData(int arg) {
            return Task.create(s -> {
                new Thread(() -> {
                    try {
                        Thread.sleep(500);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // deliver value
                        s.onSuccess("Data=" + arg);
                    }).start();
            });
        }
    
        /**
         * Example of an async scalar execution using a Task.Promise
         * <p>
         * This shows how an eager (hot) process would work like using a Future.
         *
         * @param arg
         * @return
         */
        public static Task<String> getDataUsingPromise(int arg) {
            Task.Promise<String> p = Promise.create();
    
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    p.onSuccess("Data=" + arg);
                }).start();
    
            return p.getTask();
        }
    }
    
    Enhancement 
    opened by benjchristensen 50
  • Adding super/extends so that Observable is covariant

    Adding super/extends so that Observable is covariant

    Ok, so this pull request changes a lot of lines. It's mostly generalizing all the FuncXs to be used like FuncX[-T1, -T2, ..., -TX, +R] (contravariant parameters, covariant return type) and all the Observers to be used "in a contravariant way". A few of the Observable uses are covariant, now, too (mostly zip).

    This is the pull request for #326.

    This doesn't look very good in the code (thanks Java). Also, it doesn't seem to make Scala interop easier at all (at least not yet).

    Please take a look. I'm not exactly happy with the result. - Maybe I'm doing something wrong here? - I've still got hope that there's an easier way...

    The pull request compiles and tests ok for me (except for the Clojure module, but that's another story and not due to my changes).

    opened by jmhofer 46
  • Add java.time to public API

    Add java.time to public API

    I'd like to add the java.time classes to the public API of RxJava. Mostly replacing the java.util.concurrent classes (specifically TimeUnit). Obviously deprecating the previous APIs is undesirable, though it may be wise in the future. I know updating public API changes are a big deal which is why this issue was created.

    One reason the change is valuable is that it reduces the number of method parameters, for example:

    public static Completable timer(long delay, @NonNull TimeUnit unit, @NonNull Scheduler scheduler)

    to: public static Completable timer(Duration delay, @NonNull Scheduler scheduler)

    There currently appears to be 62 classes in RxJava that have this pattern which would need updating.

    opened by benjamintboyle 2
  • RxJava onSuccess or onError callback not working in particular cases

    RxJava onSuccess or onError callback not working in particular cases

    I am having trouble with RxJava + Retrofit and OkHttp Interceptor as onError or onSuccess callbacks never got triggered on specific scenario

    This is my interface class

    interface EndpointServices {
    
        companion object {
    
    private fun interceptor(): Interceptor {
                return Interceptor { chain ->
                    val request: Request = chain.request()
                    val originalResponse: Response = chain.proceed(request)
                    val cacheControlStatus: String? = originalResponse.header("Cache-Control")
    
                    Log.wtf("INTERCEPT",
                        "ORIGINAL : CACHE-CONTROL: $cacheControlStatus")
    
                    Log.wtf("INTERCEPT",
                        "OVERWRITE CACHE-CONTROL: ${request.cacheControl} | CACHEABLE? ${
                            CacheStrategy.isCacheable(originalResponse,
                                request)
                        }")
    
                    originalResponse.newBuilder()
                        .build()
    
                }
            }
    
    
            private fun onlineOfflineHandling(): Interceptor {
                return Interceptor { chain ->
                    try {
                        Log.wtf("INTERCEPT", "FETCH ONLINE")
                        val cacheControl = CacheControl.Builder()
                            .maxAge(5, TimeUnit.SECONDS)
                            .build()
    
                        val response = chain.proceed(chain.request().newBuilder()
                            .removeHeader("Pragma")
                            .removeHeader("Cache-Control")
                            .header("Cache-Control", "public, $cacheControl")
                            .build())
    
                        Log.wtf("INTERCEPT", "CACHE ${response.cacheResponse} NETWORK ${response.networkResponse}")
    
                        response
                    } catch (e: IOException) {
                        Log.wtf("INTERCEPT", "FALLBACK TO CACHE ${e.message}")
    
                        val cacheControl: CacheControl = CacheControl.Builder()
                            .maxStale(30, TimeUnit.DAYS)
                            .onlyIfCached() // Use Cache if available
                            .build()
    
                        val offlineRequest: Request = chain.request().newBuilder()
                            .cacheControl(cacheControl)
                            .build()
    
                        val response = chain.proceed(offlineRequest)
    
                        Log.wtf("INTERCEPT", "CACHE ${response.cacheResponse} NETWORK ${response.networkResponse}")
    
                        response
                    }
                }
            }
    
    
            fun create(baseUrl: String, cacheDir: File): EndpointServices {
    
                // Inexact 150 MB of maximum cache size for a total of 4000 assets where 1MB/30 assets
                // The remaining available space will be use for other cacheable requests
                val cacheSize: Long = 150 * 1024 * 1024
    
                val cache = Cache(cacheDir, cacheSize)
    
                Log.wtf("CACHE DIRECTORY", cache.directory.absolutePath)
    
                for (cacheUrl in cache.urls())
                    Log.wtf("CACHE URLS", cacheUrl)
    
                Log.wtf("CACHE OCCUPIED/TOTAL SIZE", "${cache.size()}/${cache.maxSize()}")
    
                /*val interceptor = HttpLoggingInterceptor()
                interceptor.level = HttpLoggingInterceptor.Level.BODY*/
    
                val httpClient = OkHttpClient.Builder()
                    .cache(cache)
                    /*.addInterceptor(interceptor)*/
                    .callTimeout(10, TimeUnit.SECONDS)
                    .connectTimeout(10, TimeUnit.SECONDS)
                    .addNetworkInterceptor(interceptor())
                    .addInterceptor(onlineOfflineHandling())
                    .build()
    
                val retrofit = Retrofit.Builder()
                    .addCallAdapterFactory(
                        RxJava2CallAdapterFactory.create()
                    )
                    .addConverterFactory(
                        MoshiConverterFactory.create()
                    )
                    .client(httpClient)
                    .baseUrl(baseUrl)
                    .build()
    
                return retrofit.create(EndpointServices::class.java)
    
            }
    
    }
    
    @GET("{fullPath}")
        fun getExchangeItems(
            @Path("fullPath", encoded = true) fullPath: String,
            @Query("fields") fields: String
        ):
                Single<ExchangeItemModel>
    
    }
    
    

    Fetching it with this

    private val RetroService by lazy {
            EndpointServices.create(MySingleton.assetLink, application.cacheDir)
        }
    
    RetroService.getExchangeItems(
                MySingleton.exchange.replace("{ID}", assetName.trim().replace(" ", "-")),
                MySingleton.exchangeField
            )
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                    { result ->
    
                        // Filter the item to display
                        includedAsset.value =
                            result.data.filter { it.exchangeName != null && !it.excludedFromPrice}
    
                        excludedAsset.value =
                            result.data.filter { it.exchangeName != null && it.excludedFromPrice}
    
                    },
                    { error ->
                        Log.wtf("WTF", "${error.message}")
                        FirebaseCrashlytics.getInstance().recordException(error)
                        // Only change the UI with this conditions
                        if ((includedAsset.value!!.isEmpty() && excludedAsset.value!!.isEmpty()) || (error is HttpException && error.code() == HttpURLConnection.HTTP_GATEWAY_TIMEOUT))
                            errorMessage.value = R.string.swipe_to_refresh
                    }
                ))
    

    Case 1: No network (Wifi/Mobile Data is OFF)

    • Interceptor : A/INTERCEPT: FALLBACK TO CACHE Unable to resolve host "some.host.com": No address associated with hostname

    • onError (error ->) is called if no cache

    • onSuccess (result ->) is called if cache is available

    All Good! We show error UI to user during offline mode if no cache exist else we show the list if cache is at least available.

    ========================================================================

    Case 2: Connected to network (Wifi/Mobile Data is ON and the network has INTERNET SERVICE)

    • onError (error ->) is called when response was failed such as 404, etc.

    • onSuccess (result ->) is called when response was success

    All Good! BUT due to unknown circumstances sometimes I got A/INTERCEPT: FALLBACK TO CACHE CANCELED from the Interceptor and when this happen I don't receive any callbacks neither onError nor onSuccess thus the UI for loading never ends.

    ========================================================================

    Case 3: Connected to network (Wifi/Mobile Data is ON but the network has NO INTERNET SERVICE)

    • Interceptor : A/INTERCEPT: FALLBACK TO CACHE Unable to resolve host "some.host.com": No address associated with hostname

    • onError (error ->) is not called

    • onSuccess (result ->) is not called

    As you can see, my Interceptor log here is just the same in our first case yet no callback has been return even a cache is available thus the UI for loading never ends again.

    dependencies

    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
        implementation 'com.squareup.retrofit2:converter-moshi:2.9.0'
    
        implementation 'com.squareup.okhttp3:logging-interceptor:4.9.0'
    
        implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
        // Because RxAndroid releases are few and far between, it is recommended you also
        // explicitly depend on RxJava's latest version for bug fixes and new features.
        // (see https://github.com/ReactiveX/RxJava/releases for latest 3.x.x version)
        implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
    
    Question 
    opened by ArcherEmiya05 5
  • 3x: Add full module descriptor

    3x: Add full module descriptor

    As described at https://github.com/ReactiveX/RxJava/issues/7240, add a full Java module descriptor to version 3.x.

    This setup lets the 3.x series remain Java 8 compatible while the module-info.class is compiled with Java 9+ and stored inside the jar at /META-INF/versions9.

    3.x Enhancement 
    opened by aalmiray 1
  • 3.x Consider adding a full Java module descriptor

    3.x Consider adding a full Java module descriptor

    Full Java module descriptors (module-info.class) are a hard requirement for building custom Java Runtimes with Jlink.

    It looks like the build must remain pre Java modules however given that the build is Gradle based there's the https://plugins.gradle.org/plugin/org.beryx.jar plugin that can be applied to keep the build in its current Java version while at the same time providing a full module descriptor. Caveat: the generated JAR will become a multi-release JAR.

    If there's any interest I'll gladly provide a PR and help with maintenance.

    Relates to: https://github.com/reactive-streams/reactive-streams-jvm/issues/531

    3.x Feature-Request 
    opened by aalmiray 6
  • FlowableConcatMapSchedulerTest > mapperDelayError2ScheduledLong failed

    FlowableConcatMapSchedulerTest > mapperDelayError2ScheduledLong failed

    io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapSchedulerTest > mapperDelayError2ScheduledLong FAILED
        java.lang.AssertionError: Not completed (latch = 1, values = 1, errors = 0, completions = 0, timeout!, disposed!)
            at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:125)
            at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:178)
            at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMapSchedulerTest.mapperDelayError2ScheduledLong(FlowableConcatMapSchedulerTest.java:1093)
    
    3.x Investigating Test-Failures 
    opened by akarnokd 0
  • `Flowable#groupBy` race leads to a back-pressure issue

    `Flowable#groupBy` race leads to a back-pressure issue

    Hi!

    While debugging https://github.com/reactor/reactor-core/issues/2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)

    Apparently, with 3.0.7, the same construction in RxJava fails with a very similar issue (although the failure is different):

    final int total = 100;
    
    Long count = Flowable.range(0, total)
                         .groupBy(i -> (i / 2) * 2)
                         .flatMapMaybe(Flowable::firstElement, false, 1)
                         .observeOn(Schedulers.io())
                         .count()
                         .blockingGet();
    assertThat(total - count).as("count").isZero();
    

    Gives (not 100% reliably, consider running in "rerun until failure" mode):

    io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
    
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)
    

    A few interesting observations:

    1. Changing observeOn's buffer size to 131 and higher makes it always pass
    2. 130 would sometimes fail with Unable to emit a new group (#99) due to lack of requests
    3. 129 would sometimes fail with Unable to emit a new group (#98) due to lack of requests
    4. 128 would sometimes fail with Unable to emit a new group (#97) due to lack of requests
    5. etc etc

    So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.

    3.x Question 
    opened by bsideup 4
  • 3.x: recursive concat causes StackOverflowError

    3.x: recursive concat causes StackOverflowError

    Originally posted on StackOverflow.

    The following code crashes with StackOverflowError and the stacktrace shows a long chain of request calls.

    import io.reactivex.rxjava3.core.Flowable
    import io.reactivex.rxjava3.core.Single
    import io.reactivex.rxjava3.schedulers.Schedulers
    import java.util.concurrent.TimeUnit.SECONDS
    
    fun main() {
        fun incr(n: Int): Single<Int> = Single.just(n + 1)
    
        fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
            if (n < max)
                incr(n)
                .observeOn(Schedulers.single())
                .toFlowable()
                .concatMap { next -> numbers(next, max) }
            else
                Flowable.empty()
        )
    
        numbers(1, 10_000)
        .blockingForEach(::println)
    }
    
    Exception in thread "main" java.lang.StackOverflowError
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    

    I'm not sure why there is such a chain created and if this is a result of an RxJava bug or not.

    3.x Investigating 
    opened by akarnokd 5
  • reactivex.github.io needs a new administrator

    reactivex.github.io needs a new administrator

    I'm no longer able to devote time to maintaining the reactivex.github.io site and its associated ReactiveX repo. There are some outstanding PRs that need review/merging, but in general there hasn't been a whole lot of activity there over the past months, so this is probably not a particularly demanding position.

    opened by DavidMGross 1
  • 3.x: parallel performs poorly with 10+ parallelism

    3.x: parallel performs poorly with 10+ parallelism

    For some reason, the parallel Scrabble benchmark performs poorly when the parallelism level is 10+, for example, on my i7 8700 CPU (6 cores/12 threads):

    image

    However, my older i7 4770K processor (4 cores/8 threads) shows no such performance degradation. ~~Neither does the reactive-streams-commons implementation (the parent of RxJava's parallel implementation) with parallelism=12.~~ Correction: The Rsc benchmark was pinned to 8 threads and actually shows a similar inefficiency with 10+.

    3.x Performance 
    opened by akarnokd 3
  • Update wiki to reflect 3.x (tracking issue)

    Update wiki to reflect 3.x (tracking issue)

    This is the overview of the suggested/planned changes to the Wiki. Text in bold are for extra considerations/options.

    3.x Discussion Documentation PR welcome 
    opened by akarnokd 22
Releases(v3.0.13-RC4)
  • v3.0.13-RC4(May 3, 2021)

  • v3.0.13-RC3(Apr 26, 2021)

  • v3.0.13-RC2(Apr 17, 2021)

  • v3.0.13-RC1(Apr 17, 2021)

  • v3.0.12(Apr 8, 2021)

    Maven JavaDocs

    Bugfix

    • CompositeException.printStackTrace to write directly into PrintStream/PrintWriter. (#7212)

    Documentation

    • Fix wrong reference in Single.flattenStreamAsObservable javadoc. (#7206)
    • Fix style violating Javadoc. (#7210)

    Other

    • Fix POM_URL (#7214)
    • Upgrade Gradle to 6.8.3 (#7208)
    • Bump gradle to 6.8.3 & optimize gradle config (#7207)
    • Added Javadoc checks to Checkstyle. Fix violating Javadoc. (#7210)
    • Modernize gradle plugin block, change maven to maven-publish (#7219)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.12-RC1(Apr 8, 2021)

  • v3.0.11(Mar 6, 2021)

    Maven JavaDocs

    ℹ️ RxJava 2 is now end-of-life (EOL) and no further development or support will be provided by the project.

    Enhancement

    • Add onSubscribe hook to ParallelFlowable operators (#7191)

    Bugfix

    • Allow Single.zip and Maybe.zip result to be garbage collected (#7196)
    • Direct scheduling via Schedulers.from to honor the interruptibleWorker setting (#7203)

    Documentation

    • Fix typos in Schedulers.java (#7178)

    Other

    • Release to Sonatype directly (#7181)
    • Upgrade to Gradle 6.8.2 (#7184)
    • Cleanup of source code headers (#7205)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC5(Feb 16, 2021)

  • v3.0.11-RC4(Feb 13, 2021)

  • v2.2.21(Feb 13, 2021)

    Maven JavaDocs

    :warning: This is the last planned update for the 2.x version line. After February 28, 2021, 2.x becomes End-of-Life (EoL); no further patches, bugfixes, enhancements, documentation or support will be provided by the project.

    Enhancements

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7162)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7170)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC3(Feb 5, 2021)

  • v3.0.11-RC2(Feb 5, 2021)

    Specify the staging profile name to be "io.reactivex" so the close operation finds the repo.

    Unfortunately, there is no other way to test the release process.

    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC1(Feb 5, 2021)

  • v3.0.10(Feb 1, 2021)

    Maven JavaDocs

    Enhancement

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7160)
    • Add TestScheduler option to use onSchedule hook. (#7163)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7169)
    • Add fusion support to concatMap{Maybe|Single|Completable}. (#7165)

    Documentation

    • Update marbles of amb(), ambArray() and ambWith() (#7144)
    • Fix take() mentioning the old limit() operator (#7145)
    • Document Schedulers.from vs. RejectedExecutionException behavior. (#7150)
    • Update documentation for NewThreadWorker.scheduleActual method. (#7164)
    • Improve Javadocs style of Schedulers. (#7168)

    Other

    • onReduceBackpressure internals cleanup (#7151)
    • Workaround for FutureTask.toString recursion on JDK 10+. (#7173)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.9(Dec 30, 2020)

  • v3.0.8(Dec 2, 2020)

  • v3.0.8-RC3(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC2(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC1(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.7(Oct 7, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Observable.toFlowable(ERROR) not cancelling on MissingBackpressureException. (#7083)
    • Fix Flowable.concatMap backpressure with scalars. (#7089)

    Documentation

    • fromRunnable/fromAction javadoc improvements. (#7071)
    • Patch out duplicate @NonNull annotation in generated javadocs. (#7073)
    • Clarify the documentation for scan operators. (#7093)
    Source code(tar.gz)
    Source code(zip)
  • v2.2.20(Oct 6, 2020)

    Maven JavaDocs

    :warning: The 2.x version line is now in maintenance mode and will be supported only through bugfixes until February 28, 2021. No new features, behavior changes or documentation adjustments will be accepted or applied to 2.x. It is recommended to migrate to 3.x within this time period.

    Bugfixes

    • Fix Observable.flatMap with maxConcurrency hangs (#6960)
    • Fix Observable.toFlowable(ERROR) not cancelling upon MissingBackpressureException (#7084)
    • Fix Flowable.concatMap backpressure with scalars. (#7091)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.6(Aug 20, 2020)

  • v3.0.5(Jul 31, 2020)

  • v3.0.4(May 21, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Flowable.groupBy eviction logic double decrement and hang. (#6975)
    • Fix Flowable.groupBy cancellation/cleanup/eviction race hangs. (#6979)
    • Disable fusion on the groups of Flowable.groupBy. (#6983)
    • Fix Flowable.groupBy eviction-completion-replenishment problems. (#6988)
    • Removed unnecessary upstream.cancel() call for casually finished upstream sequences. (#6992)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.3(May 1, 2020)

    Maven JavaDocs

    Enhancements

    • Allow setting the drift tolerance timeunit via system property rx3.scheduler.drift-tolerance-unit. (#6969)

    Bugfixes

    • Fix scheduled tasks' fatal exception behavior. (#6956)

    Documentation

    • Quick Javadoc fixes. (#6943)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.2(Apr 6, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Observable.flatMap with maxConcurrency hangs. (#6946)

    Documentation

    • Add see annotation for range operators. (#6934)
    • Update images and their JavaDocs URLs to non-transparent version. (#6944)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.1(Mar 14, 2020)

  • v2.2.19(Mar 13, 2020)

    Maven JavaDocs

    :warning: The 2.x version line is now in maintenance mode and will be supported only through bugfixes until February 28, 2021. No new features, behavior changes or documentation adjustments will be accepted or applied to 2.x. It is recommended to migrate to 3.x within this time period.

    Bugfixes

    • Commit 7980c85b: Fix switchMap not canceling properly during onNext-cancel races.
    Source code(tar.gz)
    Source code(zip)
  • v2.2.18(Feb 21, 2020)

    Maven

    :warning: The 2.x version line is now in maintenance mode and will be supported only through bugfixes until February 28, 2021. No new features, behavior changes or documentation adjustments will be accepted or applied to 2.x. It is recommended to migrate to 3.x within this time period.

    Bugfixes

    • Pull 6894: Fix groupBy not requesting more if a group is cancelled with buffered items.
    Source code(tar.gz)
    Source code(zip)
  • v3.0.0(Feb 14, 2020)

    Maven JavaDocs

    We are happy to announce the release of RxJava 3.0.0 final.

    Please read the wiki page What's different in 3.0 for the details on the various changes compared to RxJava 2.x. The page also doubles as a migration guide.

    Please read the wiki page carefully before posting about common and expected migration issues such as wrong imports, changed or missing methods or using the wrong maven address.

    The project would like to thank the following contributors for their tireless effors improving RxJava 3:

    vjgarciag96, slisaasquatch, pestrada, JLLeitschuh, dvolkovv, JakeWharton, hepin1989, richardkapolnai-da, JosemyDuarte, io7m, arriolac, davidmoten, vanniktech, RomanWuattier, Erlkoenig90, hluhovskyi, luis-cortes, slievrly, maksim-m

    In addition, we would like to thank our regular and diligent reviewers, @JakeWharton and @vanniktech for their continued support of the project.

    Changes since 3.0.0-RC9

    There were no functional, API or behavior changes since RC9.

    3.0.0 development statistics

    Time: 8 months 8 days; or 253 days since the fork from 2.2.7. Issues closed: 107 PRs: 178 Files

    • Changed: 3,538
    • Lines aded: 465,505
    • Lines deleted: 420,753
    Source code(tar.gz)
    Source code(zip)
Owner
ReactiveX
Reactive Extensions for Async Programming
ReactiveX
Immutable in-memory R-tree and R*-tree implementations in Java with reactive api

rtree In-memory immutable 2D R-tree implementation in java using RxJava Observables for reactive processing of search results. Status: released to Mav

Dave Moten 860 Mar 10, 2021
Reactive Streams Utilities - Future standard utilities library for Reactive Streams.

Reactive Streams Utilities This is an exploration of what a utilities library for Reactive Streams in the JDK might look like. Glossary: A short gloss

Lightbend 59 Mar 9, 2021
Implementation of various string similarity and distance algorithms: Levenshtein, Jaro-winkler, n-Gram, Q-Gram, Jaccard index, Longest Common Subsequence edit distance, cosine similarity ...

java-string-similarity A library implementing different string similarity and distance measures. A dozen of algorithms (including Levenshtein edit dis

Thibault Debatty 2.3k Mar 13, 2021
Reactive Programming for Android

Reactive Programming for Android Agera is a set of classes and interfaces to help write functional, asynchronous, and reactive applications for Androi

Google 7.3k Mar 4, 2021
An advanced, but easy to use, platform for writing functional applications in Java 8.

Getting Cyclops X (10) The latest version is cyclops:10.4.0 Stackoverflow tag cyclops-react Documentation (work in progress for Cyclops X) Integration

AOL 1.2k Mar 5, 2021
Library for creating In-memory circular buffers that use direct ByteBuffers to minimize GC overhead

Overview This project aims at creating a simple efficient building block for "Big Data" libraries, applications and frameworks; thing that can be used

Tatu Saloranta 130 Dec 10, 2020
Micro second messaging that stores everything to disk

Chronicle Queue Contents Table of Contents Contents About Chronicle Software What is Chronicle Queue Java Docs Usage More benchmarks Downloading Chron

Chronicle Software : Open Source 2.3k Mar 13, 2021
A Java implementation of Transducers

transducers-java Transducers are composable algorithmic transformations. They are independent from the context of their input and output sources and s

null 113 Oct 25, 2020
Zero-allocation hashing for Java

Zero-Allocation Hashing Version Overview This project provides a Java API for hashing any sequence of bytes in Java, including all kinds of primitive

Chronicle Software : Open Source 556 Mar 12, 2021
Lightweight threads for Java, with message passing, nio, http and scheduling support.

Kilim: Continuations, Fibers, Actors and message passing for the JVM

Sriram Srinivasan 1.6k Mar 14, 2021
LMDB for Java

LMDB JNI LMDB JNI provide a Java API to LMDB which is an ultra-fast, ultra-compact key-value embedded data store developed by Symas for the OpenLDAP P

deephacks 195 Feb 26, 2021
A high performance caching library for Java

Caffeine is a high performance, near optimal caching library. For more details, see our user's guide and browse the API docs for the latest release. C

Ben Manes 9.5k Mar 13, 2021
gRPC and protocol buffers for Android, Kotlin, and Java.

Wire “A man got to have a code!” - Omar Little See the project website for documentation and APIs. As our teams and programs grow, the variety and vol

Square 3.4k Mar 11, 2021
RTree2D is a 2D immutable R-tree with STR (Sort-Tile-Recursive) packing for ultra-fast nearest and intersection queries

RTree2D RTree2D is a 2D immutable R-tree with STR (Sort-Tile-Recursive) packing for ultra-fast nearest and intersection queries. Goals Main our requir

Andriy Plokhotnyuk 92 Mar 13, 2021