Adventures in Data Land
Hashing for Collaborative Filtering

This is a follow-up on the hashing for linear functions post. It’s based on the HashCoFi paper that Markus Weimer, Alexandros Karatzoglou and I wrote for AISTATS’10. It deals with the issue of running out of memory when you want to use collaborative filtering for very large problems. Here’s the setting:

Assume you want to do Netflix-style collaborative filtering, i.e. you want to estimate entries in a ratings matrix of (user, movie) pairs. A rather effective approach is to use matrix factorization, that is, to approximate \(M = U^\top V\) where M is the ratings matrix, U is the (tall and skinny) matrix of features for each user, stacked up, and V is the counterpart for movies. This works well for the Netflix prize since the number of users and movies is comparatively small.

In reality we might have, say 100 million users for which we might want to recommend products. One option is to distribute all these users over several servers (similar to what a distributed hash table mapping does, e.g. for libmemcached). Alternatively, if we want to keep it all on one server, we’re facing the problem of having to store \(10^8 \cdot 100 \cdot 4 = 4 \cdot 10^10\) bytes, i.e. 40 GB if we assume to allocate 400 Bytes per user (that’s a rather small footprint). That is 100 dimensions per user. Usually this is too big for all but the biggest servers. Even worse, suppose that we have user-churn. That is, new users might be arriving while old users disappear (obviously we don’t know whether they’ll ever come back again so we don’t really want to de-allocate the memory devoted to them). Obviously we cannot add more RAM. One possible solution is to store the data on disk and request it whenever a user arrives. This will cost us 5-10ms latency. An SSD will improve this but it still limits throughput. Moreover, it’ll require cache management algorithms to interact with the collaborative filtering code. 

Here’s a simple alternative: apply the hashing trick that we used for vectors to matrices. Recall that in the exact case we compute matrix entries via

$$M[i,j] = \sum_{k=1}^{K} U[i,k] V[j,k]$$

Now denote by \(h_u\) and \(h_v\) hash functions mapping pairs of integers to a given hash range \([1 \ldots N]\). Moreover, let \(\sigma_u\) and \(\sigma_v\) be corresponding Rademacher hash functions which return a binary hash in \(\{\pm 1\}\). Now replace the above sum via

$$M[i,j] = \sum_{k=1}^{K} u[h_u(i,k)] \sigma_u(i,k) v[h_v(j,k)] \sigma_v(j,k)$$

What happened is that now all access into U is replaced by access into a vector u of length N (and the same holds true for V). Why does this work: firstly, we can prove that if we construct u and v from U and V via

$$u[k] = \sum_{h_u(i,j) = k} \sigma_u(i,j) U[i,j] \text{ and } v[k] = \sum_{h_v(i,j) = k} \sigma_v(i,j) V[i,j]$$

then the approximate version of \(M[i,j]\) converges to the correct \(M[i,j]\) with variance \(O(1/N)\) and moreover that the estimate is unbiased. Getting the exact expressions is a bit tedious and they’re described in the paper. In practice, things are even better than this rate: since we never use U and V but always u and v we simply optimize with respect to the compressed representation. 

One of the advantages of the compressed representation is that we never really need to have any knowledge of all the rows of U. In particular, rather than mapping user IDs to rows in U we simply use the user ID as the hash key. If a new user appears, memory is effectively allocated to the new user by means of the hash function. If a user disappears, his parameters will simply get overwritten if we perform stochastic gradient descent with respect to the u and v vectors. The same obviously holds for movies or any other entity one would like to recommend. 

Bottom line - we now can have fast (in memory) access to user parameters regardless of the number of users. The downside is that the latency is still quite high: remember that the hash function requires us to access \(u[h_u(i,k)]\) for many different values of k. This means that each access in k is a cache miss, i.e. it’ll cost us 100-200ns RAM latency rather than the 10-20ns we’d pay for burst reads. How to break this latency barrier is the topic of one of the next posts.

Priority Sampling

Tamas Sarlos pointed out a much smarter strategy on how to obtain a sparse representation of a (possibly dense) vector: Priority Sampling by Duffield, Lund and Thorup (Journal of the ACM 2006).  The idea is quite ingenious and (surprisingly so) essentially optimal, as Mario Szegedy showed. Here’s the algorithm:

For each \(x_i\) compute a priority \(p_i = \frac{x_i}{a_i}\) where \(a_i \sim U(0, 1]\) is drawn from a uniform distribution. Denote by \(\tau\) the k+1 largest such priority. Then pick all k indices i which satisfy \(p_i > \tau\) and assign them the value \(s_i = \mathrm{max}(x_i, \tau)\). All other coordinates are set to \(s_i = 0\).

This provides an estimator with the following properties:

  1. The variance is no larger than that of the best k+1 sparse estimator.
  2. The entries \(s_i\) satisfy \(\mathbf{E}[s_i] = x_i\)
  3. The covariance vanishes, i.e. \(\mathbf{E}[s_i s_j] = x_i x_j\) 

Note that we assumed that all \(x_i \geq 0\). Otherwise simply apply the same algorithm to \(|x_i|\) and then return signed versions of the estimate.

    Random elements from a stream

    This is a classic trick when dealing with data streams. It shows how to draw a random element from a sequence of instances without knowing beforehand how long the sequence is and which symbols occur. 

    Let us first assume that we knew the identities of all symbols. In this case finding a random symbol would be easy. All we require is that for each symbol s we draw a random variable \(\xi_s \sim U[0,1]\) from some distribution and subsequently we choose the symbol $$s^* = \mathrm{argmin}_s \xi_s.$$ Since each s has equal probability of being associated with the smallest value \(\xi_s\) it follows that the draw is uniformly random.

    Now assume that instead of requesting a random variable \(\xi_s\) we simply compute the hash of s via \(h(s)\) and we set $$s^* = \mathrm{argmin}_s h(s).$$ For a draw from the space of hash functions this again is uniform. The advantage is that we essentially determined all the random bits when selecting \(h\) rather than at the time when we want to compute its value \(h(s)\). The second advantage is that we can now simply keep track of what is the currently smallest value of \(h(s)\) and update as we go along. We have the following algorithm:

       hstar = MAXINT 
       n = 0 
       sstar = NONE
    FOR ALL incoming s DO
       IF h(s) = hstar:
          n = n + 1
       ELSE IF h(s) < hstar:
          n = 1
          hstar = h(s)
          sstar = s
    RETURN (sstar, n)

    This algorithm will provide item counts for a random element of the sequence. If you want more than one sample, simply keep a list of the symbols with the k smallest hash values and their associated counts. Such algorithms can be used to compute the variance or other moments of a sequence. 

    Sparsifying a vector/matrix

    Sometimes we want to compress vectors to reduce memory footprint or to minimize computational cost. This occurs, e.g. when we want to replace a probability vector by a sample from it. Without loss of generality, assume that our vector v has only nonnegative entries and that its entries sum up to 1, i.e. that it is a probability vector. If not, we simply apply our sampling scheme to the following:

    There are a few strategies one could use to obtain a sparse vector s:

    Sampling uniformly at random

    For an n-dimensional vector simply use the sparsification rule: pick a random number j in [1..n]. Set the j-th coordinate in the sparse vector to  and all other terms to 0. For k draws from this distribution, simply average the draws. This scheme is unbiased but it has very high variance since there’s a very high chance we’ll miss out on the relevant components of v. This is particularly bad if v only has a small number of nonzero terms. DO NOT USE.

    Sampling according to v at random

    A much better method is to draw based on the size of the entries in v. That is, treat v as a probability distribution and draw from it. If we draw coordinate j in [1..n] then set the j-th coordinate of the sparse vector to  and all other terms to 0. As before, for k draws from this distribution, simply average. The advantage of this method is that now the entries of s are within the range [0, 1] and that moreover we hone in on the nonzero terms with high weight. But we can do better …

    Sampling according to v with replacement

    The key weakness in the above methods can be seen in the following example: for a vector of the form v = (0.99, 0.005, 0.005, 0, 0, …., 0) we would need to perform many samples with replacement from v until we even draw a single instance from the second or third coordinate. This is very time consuming. However, once we’ve drawn coordinate 1 we can inspect the corresponding value in v at no extra cost and there’s no need to redraw it. 

    Enter sampling with replacement: draw from v, remove the coordinate, renormalize the remainder to 1 and repeat. This gives us a sample drawn without replacement (see the code below which builds a heap and then adds/removes things from it while keeping stuff normalized). However, we need to weigh things differently based on how they’re drawn. Here’s what you do when drawing k terms without replacement:

    Here we initialize γ=1 and j is the index of the item drawn at the i-th step. Below is some (I hope relatively bug-free) sample code which implements such a sampler. As far as I recall, I found partial fragments of the heap class on stackoverflow but can’t quite recall where I found it and who to attribute this to. This could be made more efficient in C++ but I think it conveys the general idea.

    class arrayHeap:
        # List of terms on array
        def __init__(self, probabilities):
            self.m = len(probabilities)            # sample size
            self.b = int(math.ceil(math.log(self.m,2))) # bits
            self.limit = 1 << self.b
            self.heap = numpy.zeros(self.limit << 1)    # allocate twice the size
            # allocate the leaves
            self.heap[self.limit:(self.limit + self.m)] = probabilities
            # iterate up the tree (this is O(m))
            for i in range(self.b-1,-1,-1):
                for j in range((1 << i), (1 << (i + 1))):
                    self.heap[j] = self.heap[j << 1] + self.heap[(j << 1) + 1]
        # remove term from index (this costs O(log m) steps)
        def delete(self, index):
            i = index + self.limit
            w = self.heap[i]
            for j in range(self.b, -1, -1):
                self.heap[i] -= w
                i = i >> 1
        # add value w to index (this costs O(log m) steps)
        def add(self, index, w):
            i = index + self.limit
            for j in range(self.b, -1, -1):
                self.heap[i] += w
                i = i >> 1
        # sample from arrayHeap
        def sample(self):
            xi = self.heap[1] * numpy.random.rand()
            i = 1
            while (i < self.limit):
                i <<= 1
                if (xi >= self.heap[i]):
                    xi -= self.heap[i]
                    i += 1
            return (i - self.limit)
    #Input: normalized probabilities, sample size
    def sampleWithoutReplacement(p, n):
        heap = arrayHeap(p)
        samples = numpy.zeros(n, dtype=int) # result vector
        for j in range(n):
            samples[j] = heap.sample()
        return samples
    #Input: dense matrix p, resampling rate n
    def sparsifyWithoutReplacement(p, n=1):
        (nr, nc) = p.shape
        res = scipy.sparse.lil_matrix(p.shape) # allocate sparse container
        for i in range(nr):
            tmp = sampleWithoutReplacement(p[i,:], n)
            weight = 1.0            # we start with full weight first
            for j in range(n):
                res[i,tmp[j]] = weight + (n-j - 1.0) * p[i,tmp[j]]
                weight -= p[i,tmp[j]]
        res *= (1/float(n))
        return res
    Log-probabilities, semirings and floating point numbers

    Here’s a trick/bug that is a) really well known in the research community, b) lots of beginners get it wrong nonetheless, c) simple unit tests may not detect it and d) it may be a really fatal bug in your code. You can even find it in large scale machine learning toolboxes.

    Suppose you want to do mixture clustering and you happily compute p(x|y) and p(y), say by a mixture of Gaussians. Quite often you’ll need access to p(y|x) which can be computed via

    Let’s assume that your code runs well in 2D but now you try it in 100 dimensions and it fails with a division by zero error. What’s going on? After all, your code is algebraically correct. Most likely you ignored the fact that floating point numbers have only a fixed precision and by exponentiating the argument of the Gaussian (recall, we have a normalization that is exponential in the number of dimensions) you encountered numerical underflow. What happened is that you were trying to store the relevant information in the exponent rather than the mantissa of a floating point number. On single precision you have 8 bit for the exponent and 23 for the mantissa and you just ran out of storage. 

    Here’s how you can pull things back into the mantissa - simply store log probabilities and operate with them. Hence instead of + and x you should use ‘log +’ and +.

    In this case you will not get numerical underflow when adding probabilities since the argument in the log is going to be O(1) (we have at least one term which is exp(0) and the remaining terms are smaller than 1), or at least not due to running out of precision in the exponent.

    In more general terms, what happened is that we replaced the operations ‘+’ and ‘x’ by two operations ‘log +’ and ‘+’ which act just in the same way as addition and multiplication. Aji and McEliece formalized this in their paper on the Generalized Distributive Law. Systems that satisfy these operations are called commutative semirings. Some examples are:

    • Set union (+) and intersection (x)
    • The tropical semiring which uses min (+) and + (x)
    • Boolean AND (x) and OR (+)
    • Plain old calculus with addition (+) and multiplication (x)
    • The log + semiring with log + (+) and + (x)

    Replacing these symbols in well known algorithms such as dynamic programming gives the forward-backward algorithm, shortest path calculations and others, but this is a story for another day.

    Parallel Stochastic Gradient Descent

    Here’s the problem: you’ve optimized your stochastic gradient descent library but the code is still not fast enough. When streaming data off a disk/network you cannot exceed 100 MB/s, so processing a 1TB of data will take about 3-5 hours on a single machine. And classical stochastic gradient descent is sequential even in the context of multicore machines. Before looking at ways to speed up the algorithm let’s look at the basic SGD template:

    Here f(x) is the loss function and Ω[x] is the regularizer. This procedure is entirely sequential, so how to accelerate it?


    One option is to execute the above loop in each core independently while working with a common shared weight vector x. This means that if we update x in a round-robin fashion we will have a delay of k-1 when using k cores. The delay is due to the time it takes between seeing an instance of f and when we can actually apply the update to x. The beauty of this approach is that one can prove convergence (thanks to Marty Zinkevich and John Langford) without losing anything in the rates relative to the single core setting. The intuition behind it is that as we converge optimization becomes more and more an averaging procedure and in this case a small delay does not hurt at all.

    Multiple Machines

    The Achilles heel of the above algorithm is that it requires extremely low latency between the individual processing nodes. While this is OK on a computer or on a GPU, it will fail miserably on a network of computers since we may have many miliseconds latency between the nodes. This suggests an even simpler algorithm for further parallelization:

    1. Overpartition the data into k blocks for k clusters (i.e. for each cluster simply randomly draw a fraction c = O(m/k) of the entire dataset). 
    2. Now perform stochastic gradient descent on each machine separately with constant learning rate.
    3. Average the solutions between different machines. 

    Surprisingly enough this method can be shown to converge and give optimal speedup. A paper describing the proof for this simple algorithm is currently under review (thanks to Marty Zinkevich, Markus Weimer and Lihong Li). 

    Hashing for linear functions

    This is the first of a few posts on hashing. It’s an incredibly powerful technique when working with discrete objects and sequences. And it’s also idiot-proof simple. I decided to post it after listening to a talk this morning where the speaker proposed something really complicated for what should be very simple.

    A hash function is (in its ideal CS version) a mapping h from a domain X into a set [0 … N-1] of integers such that h(x) is essentially random. A fast implementation is e.g. Murmur hash. Note that as long as the hash is good it does not matter which one you use (I have received the latter question at least twice from referees in the past. It’s about as meaningful as asking which random number generator was used for sampling).

    When we want to compute inner products of the form

    we might run into a big problem - whenever the feature map is too high-dimensional we will not have enough memory to store w. A second problem is that we might have to store a dictionary for computing the features of x, e.g. when we have a bag of words representation. It is quite likely that the dictionary will take up much more memory than the actual parameter vector w. On top of that, we need to write code to generate the dictionary. Instead of that, let’s be lazy and do the following:

    Here h is a hash function with range small enough to fit w into memory and σ is a binary (+1, -1) hash function. This looks pretty simple. It’s essentially the sketch that Charikar, Chen and Farrach-Colton suggested in 2003 for data streams. But it’s also very powerful. In particular one can show the following:

    1. The inner products are unbiased relative to the original linear function. 
    2. The variance decreases with 1/N. 
    3. No dictionary is needed. The hash function acts as our lookup device for the index. This is a massive space saver and it also is typically much faster. 
    4. We can use this even when we don’t know the full dictionary at the time we start running an online solver. E.g. whenever we get new users we don’t need to allocate more memory for them. 
    5. This can easily be extended to approximate matches. Say I have the string ‘hash’. Then I can simply insert the strings ‘*ash’, ‘h*sh’, ‘ha*h’, ‘has*’ to represent approximate matches. 
    6. We can also extend it to substrings - simply add them to the hash table. This yields e.g. ‘sub’, ‘ubs’, ‘bst’, ‘str’, ‘tri’, ‘rin’, ‘ing’, ‘ngs’ when we are interested in trigrams. In general for all n grams the cost is linear in n and the length of the document. 
    7. Adding some weighting to the features is trivial. Just do it. Go wild. Add meaningful substrings. Personalize for users. Add tasks. Use for social networks. Use collaborative filtering (more on this later).

    To get started have a look at John Langford’s Vowpal Wabbit or James Petterson’s Stream. The theory can be found in this AISTATS and this ICML paper (note: the proof of the tail bound in the ICML paper is broken).

    P.S.: This is a repost. Tumblr seems to have screwed up management of some of the post IDs.

    In praise of the Second Binomial Formula

    Here’s a simple trick you can use to compute pairs of distances: use the second binomial formula and a linear algebra library. These problems occur in RBF kernel computations (this is how it’s implemented e.g. in the Elefant library) and in k-nearest neighbor and EM clustering algorithms. 

    The problem: given sets X and Z compute the matrix of pairwise squared distances.

    Simple solution: use a double for loop, take differences, square and be happy. This is slow for a number of reasons - it isn’t optimized for the architecture, it requires you to take differences, and it also means that you aren’t using memory blocking for it. Plus it’d be quite some work to write a multithreaded code for it.

    Fast solution: use the second binomial formula to decompose the distance into norms of vectors in X and Z and an inner product between them.

    To compute the latter you can simply use BLAS, or more specifically SGEMM or DGEMM for single or double precision respectively.  A quad-core Xeon peaks out at 100GFlops for DGEMM, so computing all pairwise distances for |X| = |Z| = 100k and 1000 dimensions should take in the order of 100-200 seconds (I’m throwing in a 2x penalty for the overhead of computing distances). On a GPU this would be even faster. If you compute it one row at a time you should see about a 10x penalty in speed since DGEMV is much slower than DGEMM.

    For sparse matrices you could use an inverted index (Jon Baxter suggested this), or you could be lazy and simply let a sparse matrix linear algebra library such as OSKI or Intel MKL do the work for you.

    Lazy updates for generic regularization in SGD

    Yesterday I wrote about how to do fast stochastic gradient descent updates for quadratic regularization. However, there are lots more regularizers which one would want to use for sparse updates of high-dimensional parameter vectors. In general the problem looks like this:

    Whenever the gradients in stochastic gradient descent are sparse and whenever x is very high dimensional it would be very wasteful to use the standard SGD updates. You could use very clever data structures for storing vectors and updating. For instance John Duchi and Yoram Singer wrote a beautiful paper to address this. Or you could do something really simple … 

    Novi Quadrianto and I ran into this problem when trying to compute an optimal tiering of webpages where we had a few billion pages that needed allocating over a bunch of storage systems. Each update in this case only dealt with a dozen or so pages, hence our gradients were very very sparse. So we used this trick:

    1. Instead of storing x, also store a time-stamp when a particular coordinate was addressed last. 
    2. In between updates caused by f, all updates for a coordinate look as follows:
    3. This is a difference equation. We can approximate this by a differential equation and accumulate over the coordinate changes while no updates from f but rather just the regularizer occur. So we get the following updates
    4. Now all you need to do is solve the differential equation once and you’re done. If you don’t want to keep the partial sums over η around (this can be costly if the updates are infrequent) you could use the approximation
    5. This saves a few GB whenever the updates only occur every billion steps …

    This algorithm would obviously work for l1 programming. But it’ll work just as well for any other regularizer you could think of. 

    Easy kernel width choice

    This is an idea that was originally put forward by Bernhard Schölkopf in his thesis: Assume you have an RBF (radial basis function) kernel and you want to know how to scale it. Recall that such a kernel is given by

    We want that the argument of this function is O(1) for typical pairs of instances x and x’. Bernhard proposed to look at the dimensionality of x and rescale accordingly. This is a great heuristic. But it ignores correlation between the coordinates. A much simpler trick is to pick, say 1000 pairs (x,x’) at random from your dataset, compute the distance of all such pairs and take the median, the 0.1 and the 0.9 quantile. Now pick λ to be the inverse any of these three numbers. With a little bit of crossvalidation you will figure out which one of the three is best. In most cases you won’t need to search any further.