Room with Flowable: initialize database if it's empty


I have following @Dao, that provides Flowable<User> stream:

@Dao interface UsersDao { @Query("SELECT * FROM users") fun loadUsers(): Flowable<List<User>> }

I want the subscriber of the stream to receive updates of the database as soon as some change happens there. Subscribing to Room's Flowable I will get that feature out of the box.

What I want is following: if database is empty I want to perform a web request and save users into database. The subscriber will automatically receive new updates that had just happened.

Now I want the client of the repository not to be aware all of the initialization logics: all he does - he performs usersRepository.loadUsers(). And all of these magic should take place inside the repository class:

class UsersRepository @Inject constructor( private val api: Api, private val db: UsersDao ) { fun loadUsers(): Flowable<List<User>> { ... } }

Of course I can use following approach:

fun loadUsers(): Flowable<List<User>> { return db.loadTables() .doOnSubscribe { if (db.getCount() == 0) { val list = api.getTables().blockingGet() db.insert(list) } } }

But I would like to construct the stream without using side-effects (doOn... operators). I've tried <a href="http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#compose-io.reactivex.FlowableTransformer-" rel="nofollow">composing()</a> but that didn't help much. Been stuck on this for a while.


You could apply some conditional flatMaps:

@Dao interface UsersDao { @Query("SELECT * FROM users") fun loadUsers(): Flowable<List<User>> @Query("SELECT COUNT(1) FROM users") fun userCount() : Flowable<List<Integer>> @Insert // I don't know Room btw. fun insertUsers(List<User> users) : Flowable<Object> } interface RemoteUsers { fun getUsers() : Flowable<List<User>> } fun getUsers() : Flowable<List<User>> { return db.userCount() .take(1) .flatMap({ counts -> if (counts.isEmpty() || counts.get(0) == 0) { return remote.getUsers() .flatMap({ users -> db.insertUsers(users) }) .ignoreElements() .andThen(db.loadUsers()) } return db.loadUsers() }) }

Disclaimer: I don't know Room so please adapt the example above as the features of it allow.


Assuming your insert() call is async and also handles updates, you could do something like this:

fun loadUsers(): Flowable<List<User>> = userDao.getAllUsers().switchIfEmpty { api.getAllUsers().doOnNext { userDao.insert(it) } }

You could also use some:

fun loadUsers(): Flowable<List<User>> = userDao.getAllUsers().flatMap { it-> if (it.isEmpty()) api.getAllUsers().doOnNext { userDao.insert(it) } else Flowable.just(it)}


You should consider the case when the data is stale, therefore you need to go another way around, do a network request and database call at the same time. Whichever observable finish first, take the result and display it. Updating database should be right after network call is done.


