aws partner logo
ibm partner logo
googlecloud partner logo
microsoft partner logo
Partners
Blogs

Progressively Uploading CSV

30/Jan/2014 Posted By brijesh no comments.
Tags

Scenario

You may have encountered situations, when you need to send huge amount of data (in GBs ) to the server.  One example is upload of large files. While sending this data, your server has to slog through the upcoming mighty request body and eventually process it.

Problem Definition

So this whole process is about getting chunk of data, wait for next chunk to come and then wait again and this goes on until it receives whole request body.  All the request body data is now in servers memory. And multiple simultaneous uploads would invariably lead to a OOM error. Also till the request body is saved to disk the file/request data resides in memory and blocks the thread. As you would imagine both memory and threads are precious and scant resources. In such conditions, what can you expect but the frustratingly slow response? Would it not be nice if there is a way to handle such requests in a smarter way?

Solution

As the browser sends data in chunks of byte over the network, processing the incoming data in chunks would be a smarter choice. Play2 framework provides such a beautiful capability in the form  of Iteratee library. Iteratees support the consumption of data in chunks and in an asynchronous manner. For more detail of Iteratee you can browse through Iteratee.

Iteratee

For now you need to understand what Iteratee can do to solve our problem. Iteratee requires producer that can feed iteratee the data to process. Producer can be any Enumerator which has capability to produce data of same type as the Iteratee expects. Iteratee can consume the chunks of data progressively. It does not need all data to be radially available. It can consume data chunks whenever it is available. Iteratee doses not wait on any upcoming chunks. It is the responsibility of producer to feed Iteratee. After receiving data chunk Iteratee can start processing it. Here we will see the example code for uploading the csv file progressively. This is a Play-scala application so you need to set the required environment. Once you done that create a play application and you are ready to go further. Here are instructions on how to install and setup play.

Code

Make entries for your routes in Routes file GET / controllers.Application.index //For opening home page POST /upload controllers.Application.upload //Post request to upload file Create scala html template index.scala.html .
@helper.form(action = routes.Application.upload, 'enctype -> "multipart/form-data") {
Please upload file
} Add following methods in Application controller to put your handler for requests
def index = Action {
Ok(views.html.index())
}
index method just upon up your home page from where CSV file can be uploaded.
def upload = Action(BodyParser(request => { CsvBodyParser.parseCsvData(false)})) {
rq: Request[List[String]] =>
Ok("file uploaded successfully")
}
}
The upload method, handles the post request containing the body. You can see BodyParser is used explicitly with the Action. By default all Action takes play.api.mvc.AnyContent as BodyParser to parse the request to some Scala value.it adapts automatically according to the request Content-Type. BodyParser[A] is basically an Iteratee[Array[Byte],A] which consumes the chunk of data as Array[Byte] as long as browser sends them and return the type A which is passed into the Request to be processed by Action. Here is the helper class CsvBodyParser which parses the request and return List[String]
package controllers
import play.api.libs.iteratee.{Iteratee, Enumeratee, Parsing}
import scala.List
import scala.Predef._
import scala.Predef.String
import play.api.Logger
object CsvBodyParser {	var flag = false	var headerList: List[String] = null	def parseCsvData(f: Boolean) = {	headerList = null	flag = f	val seperator = ","	Parsing.search("n".getBytes) ><> Enumeratee.grouped(	(Enumeratee.breakE[Parsing.MatchInfo[Array[Byte]]](_.isMatch) ><>	Enumeratee.collect {	case Parsing.Unmatched(bytes) =>	val stringVal =new String(bytes)	stringVal	}	&>>	Iteratee.consume()).flatMap(r => {	if (flag) {	Iteratee.head.map(_ => processLine(r.trim.split(seperator)))	} else {	val list = r.toString.trim.split(seperator).toList	if (list.length > 0 && list.head.equals("name")) {	headerList = list	flag = true	}	Iteratee.head.map(_ => "")	}	})	) &>> Iteratee.getChunks.map(Right(_))	}	def processLine(line: Array[String]): String = {	var dataList = line.toList	var msg = ""	if (dataList.length >= 1) {	var dataMap = headerList.toList.zip(dataList).toMap
// dataMap map containing the header and corresponding column for the current row
// You can write your logic to add this into database or what ever you want
//msg ="any success message you can pass"
}
msg
}
}

Explanation

In our case as we are parsing a csv file, every row is separated by a new line character. Therefore we need chunks based on the new line to distinguish each line separately and process it. But browsers creates the chunks by its own irrespective of what the separator is. Our requirement is to get such an Iteratee which can consume each line at one go and process it. Our source of data here is chunks of Array[Byte] which may not contain new line char or may be containing more than one new lines. Therefore it has to be adapted such that it can match the type our Iteratee is expecting.

Enumeratee

To adapt one Type of Enumerator to the another type we have Enumeratee which is pipe adapter between Enumerator and Iteratee. It basically transform one Type of Enumerator to other type. For example Enumerator[Int] can be transformed into Enumerator[String] by applying Enumeratee[Int,String] on Enumerator[Int]. Enumeratee[A,B] can also be composed with an Enumeratee[B,C] to give Enumeratee[A,C]. In the parseCsvData method above you can see we have used Parsing.search(“n”.getBytes) .It searches for new line and gives an Enumeratee[Array[Byte], Parsing.MatchInfo[Array[Byte]]]. then  We compose it with Enumeratee.grouped to regroup on new line.

By applying (Enumeratee.breakE[Parsing.MatchInfo[Array[Byte]]](_.isMatch), which is again an adapter that breaks on new line. It Gives a new Enumertee which pushes everything it has on to the Iteratee by applying Iteratee.consume[Array[Byte]] which consumes and concatenates all Input chunks and return a Promise.Finally  by applying flatMap we can get actual data for a row. Iteratee.head creates an Iteratee that takes the first element of the stream. This element can be anything like header of the csv (first line) ,data or any other information associated with the body. Each column in the csv is separated by comma so we can split it to get the Array. By checking the size we can identify the header. As soon as we get the header we set the flag to true to identify every subsequent row as a data. Now we can process each line, as we have done in the processLine method by zipping the header and column to generate key value pair of header and column. You can do anything with this data to serve your purpose. With a little bit of understanding of Play and Iteratees we can now asynchronously upload large bytes of data without stretching the memory requirements.

Inspired by: https://gist.github.com/sadache/2939230

Category : App Development