Transposing a Spark RDD
I have been using Spark quite a lot for the last year. At first using the Scala interface, but lately more using the Python one.
In one of my recent projects, I received a dataset that contains expression profiles of chemical compounds on genes. That is to say, I got a dataset which had this data transposed, i.e., genes versus compounds, but that is not a handy format to work with. I load the original data into an RDD, but then I have to transpose this RDD.
I have been looking on the web but found no complete solution. Recently, a similar question came up on the Spark mailinglist. So I thought it is about time that I posted my approach.
This is the code for the function that transposes an RDD and returns a new RDD. There are other approaches, and there is room for optimisation as well. But this already gets the work done.
def rddTranspose(rdd):
= rdd.zipWithIndex()
rddT1 lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)])
.flatMap(= rddT1.map(lambda (i,j,e): (j, (i,e)))
rddT2
.groupByKey().sortByKey()= rddT2.map(lambda (i, x): sorted(list(x),
rddT3 cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
= rddT3.map(lambda x: map(lambda (i, y): y , x))
rddT4 return rddT4.map(lambda x: np.asarray(x))
This code converts the rows to numpy arrays in the return statement, so you need to import numpy as np. This step is strictly speaking not necessary, but it does make subsequent random access inside the rows faster. It must be noted as well that the procedure only works when one row (one element of the original RDD as well as the transposed RDD) fits into the JVM memory of the workers.
I left out the comments in my code, to keep it a little exciting for you…