Skip to content

Commit

Permalink
Merge pull request #295 from enragedginger/master
Browse files Browse the repository at this point in the history
Postgres LargeObject reactive streaming action
  • Loading branch information
tminglei authored Aug 9, 2016
2 parents 8a235ea + 8afdb6f commit 23a3f41
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 2 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Slick-pg
- composite type (`basic`)
- aggregate functions
- window functions
- Large Object


** _tested on `PostgreSQL` `v9.5` with `Slick` `v3.1.1`._
Expand Down Expand Up @@ -129,8 +130,7 @@ object tests extends TableQuery(new TestTable(_)) {
...
```

_p.s. above codes are for `Slick` Lifted Embedding SQL. Except that, `slick-pg` also support for `Slick` Plain SQL, for details and usages pls refer to source codes and tests._

_p.s. The code samples above are for `Slick` Lifted Embedding SQL. Aside from that, `slick-pg` also supports `Slick` Plain SQL--for details and usages please refer to source code and tests._


Configurable type/mappers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.github.tminglei.slickpg.lobj

import java.io.InputStream

import org.postgresql.PGConnection
import org.postgresql.largeobject.LargeObjectManager
import slick.dbio.{Effect, Streaming, SynchronousDatabaseAction}
import slick.jdbc.JdbcBackend
import slick.util.DumpInfo

/**
* Action for streaming Postgres LargeObject instances from a Postgres DB.
* @param largeObjectId The oid of the LargeObject to stream.
* @param bufferSize The chunk size in bytes. Default to 8KB.
*/
case class LargeObjectStreamingDBIOAction(largeObjectId: Long, bufferSize: Int = 1024 * 8) extends SynchronousDatabaseAction[Array[Byte], Streaming[Array[Byte]], JdbcBackend, Effect.All] {
//our StreamState is the InputStream on the LargeObject instance and the number of bytes read in on the last run.
type StreamState = (InputStream, Int)

/**
* Opens an InputStream on a Postgres LargeObject.
* @param context The current database context.
* @return An InputStream on a Postgres LargeObject.
*/
private def openObject(context: JdbcBackend#Context): InputStream = {
context.connection.setAutoCommit(false)
val largeObjectApi = context.connection.unwrap(classOf[PGConnection]).getLargeObjectAPI
val largeObject = largeObjectApi.open(largeObjectId, LargeObjectManager.READ, false)
largeObject.getInputStream
}

/**
* Reads the next result from the InputStream as an Array of Bytes.
* @param stream The current LargeObject InputStream.
* @return A tuple containing the next chunk of bytes, and an integer indicating the number of bytes read.
*/
private def readNextResult(stream: InputStream): (Array[Byte], Int) = {
val bytes = new Array[Byte](bufferSize)
val bytesRead = stream.read(bytes)
if (bytesRead <= 0) {
//nothing was read, so just return an empty byte array
(new Array[Byte](0), bytesRead)
} else if (bytesRead < bufferSize) {
//the read operation hit the end of the stream, so remove the unneeded cells
val actualBytes = new Array[Byte](bytesRead)
bytes.copyToArray(actualBytes)
(actualBytes, bytesRead)
} else {
(bytes, bytesRead)
}
}

/**
* Run this action. This is currently unsupported as this action only works for streaming and will throw
* an UnsupportedOperationException.
* @param context The current database context.
* @return An UnsupportedOperationException with a friendly message.
*/
override def run(context: JdbcBackend#Context): Array[Byte] = throw new UnsupportedOperationException(s"Method 'run' is not supported for this action type.")

override def getDumpInfo = DumpInfo(name = "LargeObjectStreamingDBIOAction")

/**
* Emits at most limit number of events to the context's stream.
* @param context The current database context.
* @param limit The maximum number of events to emit back to the stream.
* @param state The state of the stream as returned by the previous iteration.
* @return The new stream state.
*/
override def emitStream(context: JdbcBackend#StreamingContext, limit: Long, state: StreamState): StreamState = {
//open the stream iff no stream state exists
val (stream, previousBytesRead) = state == null match {
case true => (openObject(context), 1)
case false => state
}

//read some byte arrays
var count = 0L
var bytesRead = previousBytesRead
while (count < limit && bytesRead > 0) {
val thing = readNextResult(stream)
val bytes = thing._1
bytesRead = thing._2
context.emit(bytes)
count += 1
}

//if the final bytesRead value was non-positive, close the stream and return a null StreamState
//to indicate the end of this Stream
bytesRead <= 0 match {
case true =>
stream.close()
null
case false => (stream, bytesRead)
}
}

/**
* Cancels this stream and closes the underlying InputStream.
* @param context The current database context.
* @param state The current StreamState at the time of the cancelling.
*/
override def cancelStream(context: JdbcBackend#StreamingContext, state: StreamState): Unit = {
if (state != null) {
val (stream, _) = state
stream.close()
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.github.tminglei.slickpg.lobj

import java.io.InputStream

import com.github.tminglei.slickpg.ExPostgresDriver
import org.postgresql.PGConnection
import org.postgresql.largeobject.LargeObjectManager

/**
* Adds functionality for creating LargeObject upload actions
*/
trait LargeObjectSupport { driver: ExPostgresDriver =>

import driver.api._

/**
* Builds an action for uploading Large Object instances to the database.
* @param largeObjectStream The input stream containing the large object to upload.
* @param bufferSize The number of bytes to process in each write loop.
* @return A DBIO action which creates a Large Object in the database and returns the object's OID.
*/
def buildLargeObjectUploadAction(largeObjectStream: InputStream, bufferSize: Int = 4096): SimpleDBIO[Long] = {
SimpleDBIO { khan =>
khan.connection.setAutoCommit(false)
val largeObjectApi = khan.connection.unwrap(classOf[PGConnection]).getLargeObjectAPI
val largeObjectId = largeObjectApi.createLO()
val largeObject = largeObjectApi.open(largeObjectId, LargeObjectManager.WRITE)

val bytes = new Array[Byte](bufferSize)

Iterator.continually {
val bytesRead = largeObjectStream.read(bytes)
val bytesToWrite = if (bytesRead <= 0) {
//nothing was read, so just return an empty byte array
new Array[Byte](0)
} else if (bytesRead < bufferSize) {
//the read operation hit the end of the stream, so remove the unneeded cells
val actualBytes = new Array[Byte](bytesRead)
bytes.copyToArray(actualBytes)
actualBytes
} else {
bytes
}

largeObject.write(bytesToWrite)
bytesRead
}.takeWhile { _ > 0 }.length //call .length to force evaluation
largeObject.close()
largeObjectId
}
}
}
33 changes: 33 additions & 0 deletions core/src/main/scala/com/github/tminglei/slickpg/lobj/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Large Objects
--------------
This project supports reading from and writing to Postgres' Large Object store via reactive streams.
Given a Large Object oid, you can stream a Large Object like so:
```scala
import com.github.tminglei.slickpg.lobj

val action = LargeObjectStreamingDBIOAction(
largeObjectId = 1L,
bufferSize = 16 * 1024 //You can set the default block size (in bytes) here. Setting it to 16 * 1024 will give us 16KB per read
)
val largeObjectStream: DatabasePublisher[Array[Byte]] = db.stream(action) //create the publishing stream on the object

//turn it into an Akka Stream Source for fun
val src = akka.stream.scaladsl.Source.fromPublisher(largeObjectStream)

//maybe turn it into a Scala Play chunked response if you're into that kind of thing
Ok.chunked(src).
as("whatever/your-mimetype-is").
withHeaders(
"Content-Disposition" -> "attachment; filename=somefilename.txt"
)
```

You can upload LargeObjects by wrapping them with an InputStream and then
running the appropriate DB action like so:
```scala

val inputStream: InputStream = _
val driver = new LargeObjectSupport with ExPostgresDriver {}
val action = driver.buildLargeObjectUploadAction(inputStream)
val largeObjectIdFuture: Future[Long] = db.run(action.transactionally)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.github.tminglei.slickpg.lobj

import java.io.ByteArrayInputStream
import java.util.concurrent.Executors

import com.github.tminglei.slickpg.{ExPostgresDriver, utils}
import org.scalatest.FunSuite

import scala.concurrent.{Await, ExecutionContext}
import scala.util.{Failure, Success}
import scala.concurrent.duration._

class LargeObjectSupportSuite extends FunSuite {
implicit val testExecContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
import ExPostgresDriver.api._

val driver = new LargeObjectSupport with ExPostgresDriver {}

val db = Database.forURL(url = utils.dbUrl, driver = "org.postgresql.Driver")

test("upload and download large object") {
val testString = "some string to store as a large object"
val largeObjectUploadStream = new ByteArrayInputStream(testString.getBytes)
val uploadAction = driver.buildLargeObjectUploadAction(largeObjectUploadStream)
val composedAction = uploadAction.flatMap(oid => LargeObjectStreamingDBIOAction(oid))
val dbPublisher = db.stream(composedAction.transactionally)

val messageBuffer: StringBuffer = new StringBuffer()
val f = dbPublisher.foreach(bytes => messageBuffer.append(new String(bytes))).andThen {
case t: Success[Unit] => assert(messageBuffer.toString == testString)
case Failure(error) => throw error
}

Await.result(f, 60.seconds)
}
}

0 comments on commit 23a3f41

Please sign in to comment.