Implement DBIO

This commit is contained in:
jilen 2025-07-24 18:43:02 +08:00
parent f2828ef494
commit 70bbb18119
8 changed files with 181 additions and 85 deletions

View file

@ -316,6 +316,9 @@ private inline def transform2[A, A1, B](inline q1: Quoted)(
inline def alias(inline from: String, inline to: String): PropertyAlias =
PropertyAlias(List(from), to)
inline def query[E](using m: Mirror.ProductOf[E]): EntityQuery[E] =
query(constValue[m.MirroredLabel])
inline def query[E](
inline table: String,
inline alias: PropertyAlias*

View file

@ -2,56 +2,13 @@ package minisql.context
import minisql.util.*
import minisql.idiom.{Idiom, Statement, ReifyStatement}
import minisql.{NamingStrategy, ParamEncoder}
import minisql.ColumnDecoder
import minisql.{NamingStrategy, ParamEncoder, ColumnDecoder}
import minisql.ast.{Ast, ScalarValueLift, CollectAst}
import scala.deriving.*
import scala.compiletime.*
import scala.util.{Try, Success, Failure}
import scala.annotation.targetName
trait RowExtract[A, Row] {
def extract(row: Row): Try[A]
}
object RowExtract {
private[context] def single[Row, E](
decoder: ColumnDecoder.Aux[Row, E]
): RowExtract[E, Row] = new RowExtract[E, Row] {
def extract(row: Row): Try[E] = {
decoder.decode(row, 0)
}
}
private def extractorImpl[A, Row](
decoders: IArray[Any],
m: Mirror.ProductOf[A]
): RowExtract[A, Row] = new RowExtract[A, Row] {
def extract(row: Row): Try[A] = {
val decodedFields = decoders.zipWithIndex.traverse {
case (d, i) =>
d.asInstanceOf[ColumnDecoder.Aux[Row, ?]].decode(row, i)
}
decodedFields.map { vs =>
m.fromProduct(Tuple.fromIArray(vs))
}
}
}
inline given [P <: Product, Row, Decoder[_]](using
m: Mirror.ProductOf[P]
): RowExtract[P, Row] = {
val decoders =
summonAll[
Tuple.Map[m.MirroredElemTypes, [X] =>> ColumnDecoder[
X
] { type DBRow = Row }]
]
extractorImpl(decoders.toIArray.asInstanceOf, m)
}
}
trait Context[I <: Idiom, N <: NamingStrategy] { selft =>
val idiom: I
@ -67,11 +24,22 @@ trait Context[I <: Idiom, N <: NamingStrategy] { selft =>
type Decoder[X] = ColumnDecoder.Aux[DBRow, X]
type DBIO[E] = (
enum DBIO[E] {
case Exec(stmt: DBStatement, mapp: Iterable[DBRow] => Try[E])
case Pure(v: E)
case Raise(e: Throwable)
case FlatMap[E1, E](dbio: DBIO[E1], f: E1 => DBIO[E]) extends DBIO[E]
}
protected def prepare(
sql: String,
params: List[(Any, Encoder[?])],
params: List[(Any, Encoder[?])]
): DBStatement
private def dbio[E](
stmt: DBStatement,
mapper: Iterable[DBRow] => Try[E]
)
): DBIO.Exec[E] = DBIO.Exec(stmt, mapper)
extension (ast: Ast) {
private def liftMap = {
@ -91,7 +59,7 @@ trait Context[I <: Idiom, N <: NamingStrategy] { selft =>
}
@targetName("ioAction")
inline def io[E](inline q: minisql.Action[E]): DBIO[E] = {
inline def io[E](inline q: minisql.Action[E]): DBIO.Exec[E] = {
val extractor = summonFrom {
case e: RowExtract[E, DBRow] => e
case e: ColumnDecoder.Aux[DBRow, E] =>
@ -101,21 +69,24 @@ trait Context[I <: Idiom, N <: NamingStrategy] { selft =>
val lifts = q.liftMap
val stmt = minisql.compile[I, N](q, idiom, naming)
val (sql, params) = stmt.expand(lifts)
(
sql = sql,
params = params.map(_.value.get.asInstanceOf[(Any, Encoder[?])]),
mapper = (rows) =>
dbio(
prepare(
sql,
params.map(_.value.get.asInstanceOf[(Any, Encoder[?])])
),
(rows) => {
rows
.traverse(extractor.extract)
.flatMap(
_.headOption.toRight(new Exception(s"No value return")).toTry
)
}
)
}
inline def io[E](inline q: minisql.Agg[E])(using
e: ColumnDecoder.Aux[DBRow, E]
): DBIO[E] = {
): DBIO.Exec[E] = {
val mapper: Iterable[DBRow] => Try[E] = summonFrom {
case _: (E <:< Option[?]) =>
(rows: Iterable[DBRow]) =>
@ -146,17 +117,16 @@ trait Context[I <: Idiom, N <: NamingStrategy] { selft =>
val lifts = q.liftMap
val stmt = minisql.compile[I, N](q, idiom, naming)
val (sql, params) = stmt.expand(lifts)
(
sql = sql,
params = params.map(_.value.get.asInstanceOf[(Any, Encoder[?])]),
mapper = mapper
dbio(
prepare(sql, params.map(_.value.get.asInstanceOf[(Any, Encoder[?])])),
mapper
)
}
@targetName("ioQuery")
inline def io[E](
inline q: minisql.Query[E]
): DBIO[IArray[E]] = {
): DBIO.Exec[IArray[E]] = {
val (stmt, extractor) = summonFrom {
case e: RowExtract[E, DBRow] =>
@ -167,10 +137,9 @@ trait Context[I <: Idiom, N <: NamingStrategy] { selft =>
val lifts = q.liftMap
val (sql, params) = stmt.expand(lifts)
(
sql = sql,
params = params.map(_.value.get.asInstanceOf[(Any, Encoder[?])]),
mapper = (rows) => rows.traverse(extractor.extract)
dbio(
prepare(sql, params.map(_.value.get.asInstanceOf[(Any, Encoder[?])])),
(rows) => rows.traverse(extractor.extract)
)
}

View file

@ -3,6 +3,7 @@ package minisql
import minisql.context.mirror.*
import minisql.util.Messages.fail
import scala.reflect.ClassTag
import scala.util.Try
class MirrorContext[Idiom <: idiom.Idiom, Naming <: NamingStrategy](
val idiom: Idiom,
@ -12,7 +13,26 @@ class MirrorContext[Idiom <: idiom.Idiom, Naming <: NamingStrategy](
type DBRow = IArray[Any] *: EmptyTuple
type DBResultSet = Iterable[DBRow]
type DBStatement = Map[Int, Any]
type DBStatement = (sql: String, params: Array[Any])
extension (io: DBIO.Exec[?]) {
def sql: String = io.stmt.sql
def params: Array[Any] = io.stmt.params
}
protected def prepare(
sql: String,
params: List[(Any, Encoder[?])]
): DBStatement = {
val stmt = (sql, Array.ofDim[Any](params.size))
params.zipWithIndex.map {
case ((v, e), i) => e.setParam(stmt, i, v)
}
stmt
}
extension (r: DBRow) {

View file

@ -0,0 +1,49 @@
package minisql.context
import minisql.ColumnDecoder
import scala.deriving.*
import scala.compiletime.*
import scala.util.{Try, Success, Failure}
import minisql.util.traverse
trait RowExtract[A, Row] {
def extract(row: Row): Try[A]
}
object RowExtract {
private[context] def single[Row, E](
decoder: ColumnDecoder.Aux[Row, E]
): RowExtract[E, Row] = new RowExtract[E, Row] {
def extract(row: Row): Try[E] = {
decoder.decode(row, 0)
}
}
private def extractorImpl[A, Row](
decoders: IArray[Any],
m: Mirror.ProductOf[A]
): RowExtract[A, Row] = new RowExtract[A, Row] {
def extract(row: Row): Try[A] = {
val decodedFields = decoders.zipWithIndex.traverse {
case (d, i) =>
d.asInstanceOf[ColumnDecoder.Aux[Row, ?]].decode(row, i)
}
decodedFields.map { vs =>
m.fromProduct(Tuple.fromIArray(vs))
}
}
}
inline given [P <: Product, Row, Decoder[_]](using
m: Mirror.ProductOf[P]
): RowExtract[P, Row] = {
val decoders =
summonAll[
Tuple.Map[m.MirroredElemTypes, [X] =>> ColumnDecoder[
X
] { type DBRow = Row }]
]
extractorImpl(decoders.toIArray.asInstanceOf, m)
}
}

View file

@ -15,7 +15,8 @@ trait MirrorCodecs {
final protected def mirrorEncoder[V]: Encoder[V] = new ParamEncoder[V] {
type Stmt = ctx.DBStatement
def setParam(s: Stmt, idx: Int, v: Any): Stmt = {
s + (idx -> v)
s.params(idx) = v
s
}
}
@ -58,7 +59,7 @@ trait MirrorCodecs {
v match {
case Some(value) => e.setParam(s, idx, value)
case None =>
s + (idx -> null)
e.setParam(s, idx, null)
}
}

View file

@ -0,0 +1,24 @@
package minisql.context.sql
import munit.FunSuite
import minisql.*
case class Person(name: String, age: Int)
case class Couple(her: String, him: String)
object Person {
val peopleEntries = List(
Person("Alex", 60),
Person("Bert", 55),
Person("Cora", 33),
Person("Drew", 31),
Person("Edna", 21),
Person("Fred", 60)
)
}
inline def Persons = query[Person]
extension (inline context: SqlContext[?, ?]) {
inline def initData(v: Person) = context.io(Persons.insert(v))
}

View file

@ -6,7 +6,17 @@ import java.time.LocalDate
import minisql.{ParamEncoder, ColumnDecoder}
import scala.util.*
type AsyncStmt = (String, Array[Any])
opaque type AsyncStmt = (String, Array[Any])
object AsyncStmt {
extension (stmt: AsyncStmt) {
def sql: String = stmt._1
def encodedParams: IArray[Any] = IArray.unsafeFromArray(stmt._2)
}
def apply(sql: String, paramSizeHint: Int): AsyncStmt =
(sql, Array.ofDim[Any](paramSizeHint))
}
type AsyncEncoder[T] = ParamEncoder[T] { type Stmt = AsyncStmt }
private def asyncEncoder[A](

View file

@ -1,7 +1,7 @@
package minisql.context
import cats.syntax.all.*
import cats.effect.Async
import cats.effect.*
import minisql.context.sql.*
import minisql.context.sql.idiom.PostgresDialect
import minisql.{NamingStrategy, ParamEncoder}
@ -13,7 +13,7 @@ import scala.util.{Try, Success, Failure}
class PgAsyncContext[F[_], I <: PostgresDialect, N <: NamingStrategy](
val naming: N,
val idiom: I,
connection: PostgreSQLConnection
pool: (take: Resource[F, PostgreSQLConnection], release: F[Unit])
)(using Async[F])
extends SqlContext[I, N]
with AsyncCodecs {
@ -24,24 +24,44 @@ class PgAsyncContext[F[_], I <: PostgresDialect, N <: NamingStrategy](
private given ExecutionContext = ExecutionContext.parasitic
def run[E](dbio: DBIO[E]): F[E] = {
val (sql, params, mapper) = dbio
val initStmt = (sql, Array.ofDim[Any](params.size))
val encodedParams = params.zipWithIndex.map {
case ((value, encoder), i) =>
encoder.setParam(initStmt, i, value)
}
Async[F].fromFuture {
Async[F].delay {
connection.sendPreparedStatement(sql, encodedParams).map { result =>
mapper(result.rows.get).get
}
}
protected def prepare(
sql: String,
params: List[(Any, Encoder[?])]
): DBStatement = {
val stmt = AsyncStmt(sql, params.size)
params.zipWithIndex.foreach {
case ((v, e), i) =>
e.setParam(stmt, i, v)
}
stmt
}
def close(): F[Unit] =
Async[F].fromFuture(Async[F].delay(connection.disconnect)).void
def run[E](dbio: DBIO[E]): F[E] = {
def evalLoop[X](step: DBIO[X]): F[X] = step match {
case DBIO.Pure(v) =>
v.pure[F]
case DBIO.Raise(e) =>
e.raiseError[F, X]
case DBIO.FlatMap(io, f) =>
evalLoop(io).flatMap { a =>
evalLoop(f(a))
}
case DBIO.Exec(stmt, mapper) =>
pool.take.use { c =>
Async[F].fromFuture {
Async[F].delay {
c.sendPreparedStatement(stmt.sql, stmt.encodedParams).map {
result =>
mapper(result.rows.get).get
}
}
}
}
}
evalLoop(dbio)
}
def close(): F[Unit] = pool.release
}