Using streams to manage side effects in the Apollo cache.
When we typically talk about fetching data, we usually think about calling a function that makes a network request to a server, or a query to a local cache that returns a value back to us asynchronously. Streams take this pattern a step further by allowing us to receive not just a single value, but multiple values over time.
Our code can subscribe to a particular stream and then asynchronously receive data whenever the stream emits new data. You can visualize this streaming concept as a pipe, where on one end, a data source can put data into the pipe, and on the other end subscribers receive that data.
If you’ve worked with the Apollo library before, you’re most likely familiar with some of its APIs for accessing data like useQuery
hook. This React hook takes a given query and subscribes to the Apollo cache so that the component updates whenever that data our query cares about changes.
In the example below, our component is subscribing to changes in the Apollo cache for the employees
field:
import React from 'react';
import { useQuery, gql } from '@apollo/client';
const Component = () => {
const { data } = useQuery(
gql`
query GetEmployees {
employees {
id
name
team
}
}
`,
)
}
Apollo uses streams under the hood to manage informing clients of changes to the employees
field. When the employees
field changes, our useQuery
subscription is piped a new value from the cache and our component re-renders.
So far we’ve seen how to query the server for data, but what about querying for data derived from the cache? What if we only wanted to be read data from the cache for employees of a particular team?
Apollo makes this possible with Type Policies. Type policies allow us to create derived data fields that act as views into the cache data. If we wanted to keep track of all the banking team employees, we could create a type policy like the following:
import { InMemoryCache } from '@apollo/client';
const cache = InMemoryCache({
typePolicies: {
Query: {
fields: {
bankingEmployees: {
read(_existingEmployees, { readField }) {
const employeeReferences = readField('employees') ?? [];
return employeeReferences.filter(employeeRef => readField('team', employeeRef) === 'Banking');
}
}
}
}
}
})
Our type policy is built on top of the employees
field we queried for earlier and filters down the list of employees to ones from the team we care about. We can then set up a new query that is subscribed to our bankingEmployees
field:
import React from 'react';
import { useQuery, gql } from '@apollo/client';
const Component = () => {
const { data } = useQuery(
gql`
query GetBankingEmployees {
bankingEmployees @client {
id
name
team
}
}
`,
)
}
Now our components can retrieve a list of banking employees from the cache and because type policy fields are memoized like any other field, they won’t re-render or get piped new data unnecessarily until a change is made to invalidate them.
Type policies are a great tool for subscribing to data we care about in the cache. But what if what we want to know is just the fact that something changed, not any particular data? Our type policy for the bankingEmployees
field is is a data stream. Whenever the banking employees change, it emits the updated array of them to us. An event stream could be something more like:
when a new banking employee is added, fire an analytics event
Our first thought could be to create a type policy that picks up changes to the banking employees field just like we did before. Let’s see what that would look like:
import _ from 'lodash';
import { InMemoryCache } from '@apollo/client';
const cache = InMemoryCache({
typePolicies: {
Query: {
fields: {
onBankingEmployeesChanged: {
read(existingBankingEmployees, { readField }) {
const newBankingEmployees = readField('bankingEmployees');
const haveBankingEmployeesChanged = _.xorBy(
newBankingEmployees,
existingBankingEmployees,
(ref) => ref.__ref
).length > 0;
if (haveBankingEmployeesChanged) {
fireAnalyticsEvent();
}
}
}
}
}
}
});
In order for this type policy to fire, some query will need to subscribe to this field since Apollo won’t re-compute fields that aren’t actively being listened to. The part that is a little strange is just the fact that our event for when a banking employee changes is now its own field in the cache that can be queried for. This field has no return value like other fields do and it’s not used for its data, it is just a mechanism for triggering side effects.
It would be nice to have a way to subscribe to changes in the cache for the purpose of executing side effects and without having to add new fields. We can create this ourselves by taking advantage of how Apollo is built on streams.
The useQuery
hook we looked at earlier allows components to subscribe to changes to the fields they care about in the Apollo cache. Under the hood, useQuery
indirectly triggers the ApolloClient.watchQuery
API to create this observable stream of changes for the given query.
public watchQuery<T = any, TVariables = OperationVariables>(
options: WatchQueryOptions<TVariables, T>,
): ObservableQuery<T, TVariables> => {...}
As we can in the watchQuery
function signature, it will be returning an observable query object. ObservableQuery
extends the Observable
API from the Zen Observable library, which is Apollo’s chosen library for streaming data.
The observable returned from watchQuery
can then be subscribed to in order to receive a stream of changes. By working directly with the watchQuery
API, we can built streams for handling side effects without introducing new fields and type policies.
Our first step is to convert the Zen Observable
we get from watchData
to an RxJS observable. While everything we’re about to do is possible with Zen Observable
, it’s a relatively small library with only a handful of stream utilities built-in. By using RxJS, we’ll be able to handle a broader set of streaming use cases without needing to roll our own toolbox of utilities and helpers. You can read more about RxJS here. We can convert the watchQuery
response to use RxJS as shown below:
import {
WatchQueryOptions,
Observable as ZenObservable,
} from '@apollo/client';
import { Observable as RxObservable, Subject, Subscription } from 'rxjs';
const zenToRx = <T>(zenObservable: ZenObservable<T>): Subject<T> => {
const observable = new RxObservable<T>(observer =>
zenObservable.subscribe(observer)
);
const subject = new Subject<T>();
observable.subscribe(subject);
return subject;
};
export const streamQuery = <TVariables, TData>(
options: WatchQueryOptions<TVariables, TData>
) => {
return zenToRx(ZenObservable.from(apolloClient.watchQuery(options)));
};
We can then re-create our banking employees analytics side effect using a streamed query:
import { gql } from '@nerdwallet/apollo';
import { map, pairwise, startWith, tap } from 'rxjs/operators';
import { streamQuery } from './streams';
const bankingEmployeesQuery = gql`
query GetBankingEmployees {
readBankingEmployees @client {
id
}
}
`;
export const bankingEmployeesChangedStream: Observable<void> = streamQuery({
query: bankingEmployeesQuery,
}).pipe(
map(result => result?.data?.readBankingEmployees),
startWith([]),
pairwise(),
filter(([existingBankingEmployees, nextBankingEmployees]) => {
return _.xorBy(
newBankingEmployees,
existingBankingEmployees,
(ref) => ref.__ref
).length > 0;
}),
);
If you have used RxJS before, the stream operators we use here are likely pretty familiar, but for the rest of us let’s break down what’s going on in this new cache subscription.
We first call streamQuery
with the query we will subscribe to in the Apollo cache to get the banking employees. We then call the RxJS observable pipe
function that allows us to use a set of pipeable operators
like map
, startWith
, pairwise
and filter
to transform our stream.
Let’s go through the chain of operators one by one:
map
operator to transform our query response to extract the bankingEmployees
field we care about.startWith
operator.pairwise
operator transforms our stream to deliver an array of the previous value and the current value from the input stream instead of just the current value. This allows us to compare the current and new value like we were able to with type policies.filter
is analogous to Array.filter
and transforms the stream to omit events that don’t pass the filter’s test. We use filter
to only emit events when the banking employees have changed.Our new bankingEmployeesChangedStream
can now be subscribed to anywhere in our code to start watching for changes as shown below:
import { bankingEmployeesChangedStream } from './banking';
const subscription = bankingEmployeesChangedStream.subscribe(() => {
fireAnalyticsEvent();
});
// Later
subscription.unsubscribe();
Note: When we
subscribe
to the stream, it returns a stream subscription that we can then unsubscribe from later on if we want to stop listening to changes.
What about using our side-effect stream in React components? We can easily accomplish this with a stream hook:
export function useStream<TData = any>(
stream: Observable<TData>,
callback: (data: TData) => any
): Subscription {
const streamSubscriptionRef = useRef<Subscription>();
useUnmount(() => {
const streamSubscription = streamSubscriptionRef.current;
if (streamSubscription && !streamSubscription.closed) {
streamSubscription.unsubscribe();
}
});
if (!streamSubscriptionRef.current) {
streamSubscriptionRef.current = stream.subscribe(data => {
callback(data);
});
}
return streamSubscriptionRef.current;
}
Now our React components can handle side effects with useStream
similarly to how they accessed data with useQuery
. Using our banking employees side effect stream in a component can then look like this:
import React from 'react';
import { useStream } from '../streams';
const Component = () => {
useStream(
bankingEmployeesChangedStream,
() => {
fireAnalyticsEvent();
}
)
};
That’s all we have for now on handling side effects in the Apollo cache with streams. Hopefully we’ve clarified how Apollo and tools like useQuery
work under the hood to deliver data updates and shown that with some tweaking, we can use the same APIs to effectively hook into side effects.