Adventures in Data Land
Distributed synchronization with the distributed star

Here’s a simple synchronization paradigm between many computers that scales with the number of machines involved and which essentially keeps cost at \(O(1)\) per machine. For lack of a better name I’m going to call it the distributed star since this is what the communication looks like. It’s quite similar to how memcached stores its (key,value) pairs. 

Assume you have n computers, each of which have a copy of a large parameter vector w (typically several GB) and we would like to keep these copies approximately synchronized.

A simple version would be to pause the computers occasionally, have them send their copies to a central node, and then return with a consensus value. Unfortunately this takes \(O(|w| \log n)\) time if we aggregate things on a tree (we can reduce it by streaming data through but this makes the code a lot more tricky). Furthermore we need to stop processing while we do so. The latter may not even be possible and any local computation is likely to benefit from having most up-to-date parameters. 

Instead, we use the following: assume that we can break up the parameter vector into smaller (key, value) pairs that need synchronizing. We now have each computer send its local changes for each key to a central server, update the parameters there, and later receive information about global changes. So far this algorithm looks stupid - after all, when using n machines it would require \(O(|w| n)\) time to process since the central server is the bottleneck. This is where the distributed star comes in. Instead of keeping all data on a single server, we use the well known distributed hashing trick and send it to a machine n from a pool P of servers:

$$n(\mathrm{key}, P) = \mathop{\mathrm{argmin}}_{n \in P} ~ h(\mathrm{key}, n)$$

Here h is the hash function. Such a system spreads communication evenly and it leads to an \(O(|w| n/|P|)\) load per machine. In particular, if we make each of the computers involved in the local computation also members of the pool, i.e. if we have \(n = |P|\) we get an \(O(|w|)\) cost for keeping terms synchronized regardless of the number of machines involved. 

Obvious approximations: we assume that all machines are on the same switch. Moreover we assume that the times to open a TCP/IP connection are negligible (we keep them open after the first message) relative to the work to transmit the data. 

The reason I’m calling this a distributed star is that for each key we have a star communication topology, it’s just that we use a different star for each key. If anyone in systems knows what this thing is really called, I’d greatly appreciate feedback. Memcached uses the same setup, alas it doesn’t have versioned writes and callbacks, so we had to build our own system using ICE.

Speeding up Latent Dirichlet Allocation

The code to our LDA implementation on Hadoop is released on Github under the Mozilla Public License. It’s seriously fast and scales very well to 1000 machines or more (don’t worry, it runs on a single machine, too). We believe that at present this is the fastest implementation you can find, in particular if you want to have a) 1000s of topics, b) a large dictionary, c) a large number of documents, and d) Gibbs sampling. It handles quite comfortably a billion documents. Shravan Narayanamurthy deserves all the credit for the code. The paper describing an earlier version of the system appeared in VLDB 2010

Some background: Latent Dirichlet Allocation by Blei, Jordan and Ng (JMLR 2003) is a great tool for aggregating terms beyond what simple clustering can do. While the original paper showed exciting results it wasn’t terribly scalable. A significant improvement was the collapsed sampler of Griffiths and Steyvers (PNAS 2004). The key idea was that in an exponential families model with conjugate prior you can integrate out the natural parameter, thus providing a sampler that mixed much more rapidly. It uses the following update equation to sample the topic for a word.

$$p(t|d,w) \propto \frac{n^*(t,d) + \alpha_t}{n^*(d) + \sum_{t’} \alpha_{t’}} \frac{n^*(t,w) + \beta_w}{n^*(t) + \sum_{w’} \beta_{w’}}$$

Here t denotes the topic, d the document, w the word, and \(n(t,d), n(d), n(t,w), n(t)\) denote the number of words which satisfy a particular (topic, document), (document), (topic, word), (topic) combination. The starred quantities such as $n^*(t,d)$ simply mean that we use the counts where the current word for which we need to resample the topic is omitted. 

Unfortunately the above formula is quite slow when it comes to drawing from a large number of topics. Worst of all, it is nonzero throughout. A rather ingenious trick was proposed by Yao, Mimno, and McCallum (KDD 2009). It uses the fact that the relevant terms in the sum are sparse and only the \(\alpha\) and \(\beta\) dependent terms are dense (and obviously the number of words per document doesn’t change, hence we can drop that, too). This yields

$$p(t|d,w) \propto \frac{\alpha_t \beta_w}{n^*(t) + \sum_{w’} \beta_{w’}} + n^*(t,d) \frac{n^*(t,w) + \beta_w}{n^*(t) + \sum_{w’} \beta_{w’}} + \frac{n^*(t,d) n^*(t,w)}{n^*(t) + \sum_{w’} \beta_{w’}}$$

Out of these three terms, only the first one is dense, all others are sparse. Hence, if we knew the sum over \(t\) for all three summands we could design a sampler which first samples which of the blocks is relevant and then which topic within each of these blocks. This is efficient since the first term doesn’t actually depend on \(n(t,w)\) or \(n(t,d)\) but rather only on \(n(t)\) which can be updated efficiently after each new topic assignment. In other words, we are able to update dense term in O(1) operations after each sampling step and the remaining terms are all sparse. This trick gives a 10-50 times speedup in the sampler over a dense representation.

To combine several machines we have two alternatives: one is to perform one sampling pass over the data and then reconcile the samplers. This was proposed by Newman, Asuncion, Smyth, and Welling (JMLR 2009). While the approach proved to be feasible, it has a number of disadvantages. It only exercises the network while the CPU sits idle and vice versa. Secondly, a deferred update makes for slower mixing. Instead, one can simply have each sampler communicate with a distributed central storage continuously. In a nutshell, each node sends the differential to the global statekeeper and receives from it the latest global value. The key point is that this occurs asynchronously and moreover that we are able to decompose the state over several machines such that the available bandwidth grows with the number of machines involved. More on such distributed schemes in a later post.

Bloom Filters

Bloom filters are one of the really ingenious and simple building blocks for randomized data structures. A great summary is the paper by Broder and Mitzenmacher. In this post I will briefly review its key ideas since it forms the basis of the Count-Min sketch of Cormode and Muthukrishnan, it will also be necessary for an accelerated version of the graph kernel of Shervashidze and Borgwardt, and finally, a similar structure will be needed to compute data streams over time for a real-time sketching service.

At its heart a bloom filter uses a bit vector of length N and a set of k hash functions mapping arbitrary keys x into their hash values \(h_i(x) \in [1 .. N]\) where \(i \in \{1 .. k\}\) denotes the hash function. The Bloom filter allows us to perform approximate set membership tests where we have no false negatives but we may have a small number of false positives. 

Initialize(b): Set all \(b[i] = 0\)

Insert(b,x): For all \(i \in \{1 .. k\}\) set \(b[h_i(x)] = 1\)

Query(b, x): Return true if \(b[h_i(x)] = 1\) for all \(i \in \{1 .. k\}\), false otherwise

Furthermore, unions and intersections between sets are easily achieved by performing bit-wise OR and AND operations on the bloom hashes of the corresponding sets respectively.

It is clear that if we inserted x into the Bloom filter the query will return true, since all relevant bits in b are 1. To analyze the probability of a false positive take the probability of a bit being 1. After inserting m items using k hash functions on a range of N we have

$$\Pr(b[i] = \mathrm{TRUE}) = 1 - (1 - \frac{1}{N})^{k m} \approx 1 - e^{-\frac{km}{N}}$$

For a false positive to occur we need to have all k bits associated with the hash functions to be 1. Ignoring the fact that the hash functions might collide the probability of false positives is given by

$$p \approx (1 - e^{-\frac{km}{N}})^k$$

Taking derivatives with respect to \(\frac{km}{N}\) shows that the minimum is obtained for \(\log 2\), that is \(k = \frac{N}{m} \log 2\).

One of the really nice properties of the Bloom filter is that all memory is used to store the information about the set rather than an index structure storing the keys of the items. The downside is that it is impossible to read out b without knowing the queries. Also note that it is impossible to remove items from the Bloom filter once they’ve been inserted. After all, we do not know whether some of the bits might have collided with another key, hence setting the corresponding bits to 0 would cause false negatives. 

Real simple covariate shift correction

Imagine you want to design some algorithm to detect cancer. You get data of healthy and sick people; you train your algorithm; it works fine giving you high accuracy and you conclude that you’re ready for a successful career in medical diagnostics.

Not so fast …

Many things could go wrong. In particular, the distributions that you work with for training and those in the wild might differ considerably. This happened to an unfortunate startup I had the opportunity to consult for many years ago. They were developing a blood test for a disease that affects mainly older men and they’d managed to obtain a fair amount of blood samples from patients. It is considerably more difficult, though, to obtain blood samples from healthy men (mainly for ethical reasons). To compensate for that, they asked a large number of students on campus to donate blood and they performed their test. Then they asked me whether I could help them build a classifier to detect the disease. I told them that it would be very easy to distinguish between both datasets with probably near perfect accuracy. After all, the test subjects differed in age, hormone level, physical activity, diet, alcohol consumption, and many more factors unrelated to the disease. This was unlikely to be the case with real patients: Their sampling procedure had caused an extreme case of covariate shift that couldn’t be corrected by conventional means. In other words, training and test data were so different that nothing useful could be done and they had wasted significant amounts of money. 

In general the situation is not quite so dire. Assume that we want to estimate some dependency \(p(y|x)\) for which we have labeled data \((x_i, y_i)\). Alas, the observations \(x_i\) are drawn from some distribution \(q(x)\) rather than the ‘proper’ distribution \(p(x)\). If we adopt a risk minimization approach, that is, if we want to solve

$$\mathrm{minimize}_{f} \frac{1}{m} \sum_{i=1}^m l(x_i, y_i, f(x_i)) + \frac{\lambda}{2} \|f\|^2$$

we will need to re-weight each instance by the ratio of probabilities that it would have been drawn from the correct distribution, that is, we need to reweight things by \(\frac{p(x_i)}{q(x_i)}\). This is the ratio of how frequently the instances would have occurred in the correct set vs. how frequently it occurred with the sampling distribution \(q\). It is sometimes also referred to as the Radon-Nikodym derivative. Such a method is called importance sampling and the following derivation shows why it is valid:

$$\int f(x) dp(x) = \int f(x) \frac{dp(x)}{dq(x)} dq(x)$$

Alas, we do not know \(\frac{dp(x)}{dq(x)}\), so before we can do anything useful we need to estimate the ratio. Many methods are available, e.g. some rather fancy operator theoretic ones which try to recalibrate the expectation operator directly using a minimum-norm or a maximum entropy principle. However, there exists a much more pedestrian, yet quite effective approach that will give almost as good results: logistic regression. 

After all, we know how to estimate probability ratios. This is achieved by learning a classifier to distinguish between data drawn from \(p\) and data drawn from \(q\). If it is impossible to distinguish between the two distributions then it means that the associated instances are equaly likely to come from either oneof the two distributions. On the other hand, any instances that can be well discriminated should be significantly over/underweighted accordingly. For simplicity’s sake assume that we have an equal number of instances from both distributions, denoted by \(x_i \sim p(x)\) and \(x_i’ \sim q(x)\) respectively. Now denote by \(z_i\) labels which are 1 for data drawn from \(p\) and -1 for data drawn from \(q\). Then the probability in a mixed dataset is given by

$$p(z=1|x) = \frac{p(x)}{p(x) + q(x)}$$

Hence, if we use a logistic regression approach which yields \(p(z=1|x) = \frac{1}{1 + e^{-f(x)}}\), it follows (after some simple algebra) that 

$$\frac{p(z=1|x)}{p(z=-1|x)} = e^{f(x)}.$$

Now we only need to solve the logistic regression problem

$$\mathrm{minimize}_f \frac{1}{2m} \sum_{(x,z)} \log [1 + \exp(-z f(x))] + \frac{\lambda}{2} \|f\|^2$$

to obtain \(f\). Subsequently we can use \(e^{f(x_i)}\) as covariate shift correction weights in training our actual classifier. The good news is that we can use an off-the-shelf tool such as logistic regression to deal with a decidedly nonstandard estimation problem. 

Graphical Models for the Internet

Here are a few tutorial slides I prepared with Amr Ahmed for WWW 2011 in Hyderabad next week. They describe in fairly basic (and in the end rather advanced) terms how one might use graphical models for the amounts of data available on the internet. Comments and feedback are much appreciated. 

PDF Keynote

Memory Latency, Hashing, Optimal Golomb Rulers and Feistel Networks

In many problems involving hashing we want to look up a range of elements from a vector, e.g. of the form \(v[h(i,j)]\) for arbitrary \(i\) and for a range of \(j \in \{1, \ldots, n\}\) where \(h(i,j)\) is a hash function. This happens e.g. for multiclass classification, collaborative filtering, and multitask learning. 

While this works just fine in terms of estimation performance, traversing all values of j leads to an algorithm which is horrible in terms of memory access patterns. Modern RAM chips are much faster (over 10x) when it comes to reading values in sequence than when carrying out random reads. Furthermore, random access destroys the benefit of a cache. This leads to algorithms which are efficient in terms of their memory footprint, however, they can be relatively slow. One way to address this is to bound the range of \(h(i,j)\) for different values of j. Here are some ways of how we could do this:

  1. Decompose \(h(i,j) = h(i) + j\). This is computationally very cheap, it has good sequential access properties but it leads to horrible collisions should there ever be two \(i\) and \(i’\) for which \(|h(i) - h(i’)| \leq n\). 
  2. Decompose \(h(i,j) = h(i) + h’(j)\) where \(h’(j)\) has a small range of values.
    This is a really bad idea since now we have a nontrivial probability of collision as soon as the range of \(h’(j)\) is less than \(n^2\) due to the birthday paradox. Moreover, for adjacent values \(h(i)\) and \(h(i’)\) we will get many collisions.
  3. Decompose \(h(i,j) = h(i) + g(j)\) where \(g(j)\) is an Optimal Golomb Ruler.
    The latter is an increasing sequence of integers for which any pairwise distance occurs exactly once. In other words, the condition \(h(a) - h(b) = h(c) - h(d)\) implies that \(a = c\) and \(b = d\). John Langford proposed this to address the problem. In fact, it solves our problem since there are a) no collisions for a fixed \(i\) and b) for neighboring values \(h(i)\) and \(h(i’)\) we will get at most one collision (due to the Golomb ruler property). Alas, this only works up to \(n=26\) since finding an Optimal Golomb Ruler is hard (it is currently unknown whether it is actually NP hard).
  4. An alternative that works for larger n and that is sufficiently simple to compute is to use cryptography. After all, all we want is that the hash function \(h’(j)\) has small range and that it doesn’t have any self collisions or any systematic collisions. We can achieve this by encrypting j using the key i to generate an encrypted message of N possible values. In other words we use
    $$h(i,j) = h(i) + \mathrm{crypt}(j|i,N)$$
    Since it is an encryption of j, the mapping is invertible and we won’t have collisions for a given value of j. Furthermore, for different i the encodings will be uncorrelated (after all, i is the key). Finally, we can control the range \(N>n\) simply by choosing the encryption algorithm. In this case the random memory access is of bounded range, hence the CPU cache will not suffer from too many misses.

A particularly nice algorithm is the Feistel cipher. It works as follows: define the iterative map

$$f(x,y) = (y, x \mathrm{XOR} h(y))$$

Here \(h\) is a hash function. After 4 iterations \((x,y) \to f(x,y)\) we obtain an encryption of \((x,y)\). Now use \(x=i\) and \(y = j\) to obtain the desired result. Basically we are trading off memory latency with computation (which is local).

Collaborative Filtering considered harmful

Much excellent work has been published on collaborative filtering, in particular in terms of recovering missing entries in a matrix. The Netflix contest has contributed a significant amount to the progress in the field. 

Alas, reality is not quite as simple as that. Very rarely will we ever be able to query a user about arbitrary movies, books, or other objects. Instead, user ratings are typically expressed as preferences rather than absolute statements: a preference for Die Hard given a generic set of movies only tells us that the user appreciates action movies; however, a preference for Die Hard over Terminator or Rocky suggests that the user might favor Bruce Willis over other action heroes. In other words, the context of user choice is vital when estimating user preferences. 

Hence if we attempt to estimate scores \(s_{ui}\) of user \(u\) regarding item \(i\) it is important to use the context within which the ratings have been obtained. For instance, if we are given a session of items \((i_1, \ldots, i_n)\) out of which item \(i^*\) was selected we might want to consider a logistic model of the form:

$$-\log p(i^*|i_1, \ldots, i_n) = \log \left[\sum_{i=1}^n e^{s_{ui}} \right] - s_{ui^*}$$

The option of no action is easy to add, simply by adding the null score \(s_{u0}\) which captures the event of no action by a user.
Shuang Hong tried out this idea to get a significant performance improvement on a number of collaborative filtering datasets. Bottom line - make sure that the problem you’re solving is actually the one that a) generated the data and b) that will help you in practice. That is, in many cases matrix completion is not the problem you want to solve, even though it might win you benchmarks.


Some readers might wonder why I’m writing this blog. Here’s an (incomplete) list:

  • It’s fun.
  • There are lots of fantastic blogs discussing the philosophy and big questions of machine learning (e.g. John Langford’s but I couldn’t find many covering simple tricks of the trade.
  • Scientific papers sometimes obscure simple ideas. In the most extreme case, a paper will get rejected if the idea is presented in too simple terms (it happened to me more than once and the paper was praised once the simple parts had been obfuscated). Also, they need to come with ample evidence for why an idea works, strong theoretical guarantees and lots of experiments. This is all needed as a safeguard and it’s really really important. But it often hides the basic idea.
  • Some ideas are really cute and useful but not big enough to write a paper about. It’s pointless to write 10 pages if the idea can be fully covered in 1 page. We’d need a journal of 1 page ideas to deal with this.
  • Many practitioners are scared to pick up a paper with many equations but they might be willing to spend 10 minutes reading a blog post.
Hashing for Collaborative Filtering

This is a follow-up on the hashing for linear functions post. It’s based on the HashCoFi paper that Markus Weimer, Alexandros Karatzoglou and I wrote for AISTATS’10. It deals with the issue of running out of memory when you want to use collaborative filtering for very large problems. Here’s the setting:

Assume you want to do Netflix-style collaborative filtering, i.e. you want to estimate entries in a ratings matrix of (user, movie) pairs. A rather effective approach is to use matrix factorization, that is, to approximate \(M = U^\top V\) where M is the ratings matrix, U is the (tall and skinny) matrix of features for each user, stacked up, and V is the counterpart for movies. This works well for the Netflix prize since the number of users and movies is comparatively small.

In reality we might have, say 100 million users for which we might want to recommend products. One option is to distribute all these users over several servers (similar to what a distributed hash table mapping does, e.g. for libmemcached). Alternatively, if we want to keep it all on one server, we’re facing the problem of having to store \(10^8 \cdot 100 \cdot 4 = 4 \cdot 10^10\) bytes, i.e. 40 GB if we assume to allocate 400 Bytes per user (that’s a rather small footprint). That is 100 dimensions per user. Usually this is too big for all but the biggest servers. Even worse, suppose that we have user-churn. That is, new users might be arriving while old users disappear (obviously we don’t know whether they’ll ever come back again so we don’t really want to de-allocate the memory devoted to them). Obviously we cannot add more RAM. One possible solution is to store the data on disk and request it whenever a user arrives. This will cost us 5-10ms latency. An SSD will improve this but it still limits throughput. Moreover, it’ll require cache management algorithms to interact with the collaborative filtering code. 

Here’s a simple alternative: apply the hashing trick that we used for vectors to matrices. Recall that in the exact case we compute matrix entries via

$$M[i,j] = \sum_{k=1}^{K} U[i,k] V[j,k]$$

Now denote by \(h_u\) and \(h_v\) hash functions mapping pairs of integers to a given hash range \([1 \ldots N]\). Moreover, let \(\sigma_u\) and \(\sigma_v\) be corresponding Rademacher hash functions which return a binary hash in \(\{\pm 1\}\). Now replace the above sum via

$$M[i,j] = \sum_{k=1}^{K} u[h_u(i,k)] \sigma_u(i,k) v[h_v(j,k)] \sigma_v(j,k)$$

What happened is that now all access into U is replaced by access into a vector u of length N (and the same holds true for V). Why does this work: firstly, we can prove that if we construct u and v from U and V via

$$u[k] = \sum_{h_u(i,j) = k} \sigma_u(i,j) U[i,j] \text{ and } v[k] = \sum_{h_v(i,j) = k} \sigma_v(i,j) V[i,j]$$

then the approximate version of \(M[i,j]\) converges to the correct \(M[i,j]\) with variance \(O(1/N)\) and moreover that the estimate is unbiased. Getting the exact expressions is a bit tedious and they’re described in the paper. In practice, things are even better than this rate: since we never use U and V but always u and v we simply optimize with respect to the compressed representation. 

One of the advantages of the compressed representation is that we never really need to have any knowledge of all the rows of U. In particular, rather than mapping user IDs to rows in U we simply use the user ID as the hash key. If a new user appears, memory is effectively allocated to the new user by means of the hash function. If a user disappears, his parameters will simply get overwritten if we perform stochastic gradient descent with respect to the u and v vectors. The same obviously holds for movies or any other entity one would like to recommend. 

Bottom line - we now can have fast (in memory) access to user parameters regardless of the number of users. The downside is that the latency is still quite high: remember that the hash function requires us to access \(u[h_u(i,k)]\) for many different values of k. This means that each access in k is a cache miss, i.e. it’ll cost us 100-200ns RAM latency rather than the 10-20ns we’d pay for burst reads. How to break this latency barrier is the topic of one of the next posts.

Priority Sampling

Tamas Sarlos pointed out a much smarter strategy on how to obtain a sparse representation of a (possibly dense) vector: Priority Sampling by Duffield, Lund and Thorup (Journal of the ACM 2006).  The idea is quite ingenious and (surprisingly so) essentially optimal, as Mario Szegedy showed. Here’s the algorithm:

For each \(x_i\) compute a priority \(p_i = \frac{x_i}{a_i}\) where \(a_i \sim U(0, 1]\) is drawn from a uniform distribution. Denote by \(\tau\) the k+1 largest such priority. Then pick all k indices i which satisfy \(p_i > \tau\) and assign them the value \(s_i = \mathrm{max}(x_i, \tau)\). All other coordinates are set to \(s_i = 0\).

This provides an estimator with the following properties:

  1. The variance is no larger than that of the best k+1 sparse estimator.
  2. The entries \(s_i\) satisfy \(\mathbf{E}[s_i] = x_i\)
  3. The covariance vanishes, i.e. \(\mathbf{E}[s_i s_j] = x_i x_j\) 

Note that we assumed that all \(x_i \geq 0\). Otherwise simply apply the same algorithm to \(|x_i|\) and then return signed versions of the estimate.