Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala Library Quick #21

Open
w4n9H opened this issue Aug 10, 2018 · 0 comments
Open

Scala Library Quick #21

w4n9H opened this issue Aug 10, 2018 · 0 comments
Labels
Java Java系,比如Scala,Clojure,Spark等

Comments

@w4n9H
Copy link
Owner

w4n9H commented Aug 10, 2018

Scala Library

scala的常用标准库;以及常用的第三方库,

scala标准库:scala.collection,scala.io,scala.concurrent,scala.math,scala.sys

java标准库:java.net,java.io

ORM:Slick,quill,Squeryl,Anorm(Play的持久层),ScalaActiveRecord(基于Squeryl)

Web框架:spray, Play,Scalatra,Lift

大数据:spark,elasticsearch,kafka,hbase,hive,hdfs等

文件操作

读文件

import scala.io.Source
Source.fromFile("test.txt").foreach(println)
Source.fromFile("test.txt").getLines().foreach(println)
Source.fromFile("test.txt").getLines().length
Source.fromURL("www.baidu.com","UTF-8")
Source.fromString("Hello World!")

写文件

import java.io.PrintWriter
val out = new PrintWriter("test1.txt")
(1 to 100).foreach(y => out.println(y.toString))
out.flush()
out.close()

访问目录

import java.io.File
case class SumDir(var total:Int = 0) {
  def sumDir(dir_name:String):Int = {
    List(new File(dir_name)).foreach(
      x => x.listFiles().foreach(
        f => {
          if (f.isDirectory) sumDir(f.getPath) else this.total += 1
        }))
    this.total
  }
}

网络操作

基于java.net

import java.net.{URL, HttpURLConnection}
import scala.io.Source
val connection = new URL("http://baidu.com").openConnection.asInstanceOf[HttpURLConnection]
    connection.setConnectTimeout(5000)
    connection.setReadTimeout(5000)
    connection.setRequestMethod("GET")
    println(Source.fromInputStream(connection.getInputStream, "utf-8").mkString)

并发异步

简单的并发及异步操作,深入的要开一个单独的文档了

import scala.concurrent.{Future, Promise, ExecutionContext, Await}
import ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

// 简单的future实现
def worker(s:String): String = {
  Thread.sleep(2000)
  s + " result"
}
def main(args: Array[String]): Unit = {
  val r = Future(worker("wang"))
  Thread.sleep(3000)
  println(r.isCompleted)  //是否完成
  println(r.value)  //获取返回值
}
//另外一种写法
def worker(): Future[String] = Future {  //Future函数
  println("begin worker")
  Thread.sleep(2000)
  println("end worker")
  "result"
}
def main(args: Array[String]): Unit = {
  val r = worker()
  Thread.sleep(3000)
  r.onComplete {  //回调
    case Success(result) => println(result)
    case Failure(e) => println("error: " + e.getMessage)
  }
}
//Await用法
def main(args: Array[String]): Unit = {
    val r = Await.result(worker(), Duration.Inf)  //等待执行结果
    println(r)
//map和flapMap用法,组合future,这用法有点奇葩,不太理解
//future可以直接当做变量使用在map, flatMap或是for中,生成的也是future, 
//并且在所有需要的future complete后,调用callback
def main(args: Array[String]): Unit = {
  val rateQuote = Future {
    1
  }
  println(rateQuote)
  val purchase = rateQuote map { quote =>
    quote + 1
  }
  println(purchase)
  Thread.sleep(2000)
  purchase foreach { x =>
    println(x)
  }
}

ORM

Slick sbt

libraryDependencies ++= Seq(
  "com.typesafe.slick" %% "slick" % "3.2.3",
  "org.slf4j" % "slf4j-nop" % "1.6.4",
  "org.xerial" % "sqlite-jdbc" % "3.20.0",
  "com.typesafe.slick" %% "slick-hikaricp" % "3.2.3"
)

Slick use sqlite

import slick.jdbc.SQLiteProfile.api._
import scala.util.{Success, Failure}
import scala.concurrent.{Await, ExecutionContext}
import ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration


object db_client {
  class Suppliers(tag: Tag) extends Table[(Int, String, String, String, String, String)](tag, "SUPPLIERS") {
    def id = column[Int]("SUP_ID", O.PrimaryKey) // This is the primary key column
    def name = column[String]("SUP_NAME")
    def street = column[String]("STREET")
    def city = column[String]("CITY")
    def state = column[String]("STATE")
    def zip = column[String]("ZIP")
    // Every table needs a * projection with the same type as the table's type parameter
    def * = (id, name, street, city, state, zip)
  }
  val suppliers = TableQuery[Suppliers]

  def main(args: Array[String]): Unit = {
    val db = Database.forURL(
      url = "jdbc:sqlite:test.db",
      driver = "org.sqlite.JDBC"
    )
    val setup = DBIO.seq(
      suppliers.schema.create,
      suppliers += (101, "Acme, Inc.",      "99 Market Street", "Groundsville", "CA", "95199"),
      suppliers += ( 49, "Superior Coffee", "1 Party Place",    "Mendocino",    "CA", "95460"),
      suppliers += (150, "The High Ground", "100 Coffee Lane",  "Meadows",      "CA", "93966")
    )
    val setupFuture = db.run(setup)
    setupFuture.onComplete {
      case Success(result) => println(result)
      case Failure(e) => println("error: " + e.getMessage)
    }
    val x = Await.result(db.run(suppliers.result), Duration.Inf)
    x.foreach(println)
  }
}

Quill sbt

libraryDependencies ++= Seq(
  "org.xerial" % "sqlite-jdbc" % "3.20.0",
  "io.getquill" %% "quill-jdbc" % "2.5.4"
)

Elasticsearch

sbt配置

libraryDependencies ++= Seq(
  "com.sksamuel.elastic4s"  "elastic4s-http" % "6.3.3"
)

创建连接

import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
val client = ElasticClient(ElasticProperties("http://192.168.55.237:9200"))

创建index

import com.sksamuel.elastic4s.http.ElasticDsl._
val r = client.execute {
  createIndex("artists")  //还可以通过mappings参数设置mapping
}.await
println(r.body)  // Some({"acknowledged":true,"shards_acknowledged":true})

删除index

val r = client.execute {
  deleteIndex("artists")
}.await
println(r.body)  // Some({"acknowledged":true})

插入数据

val r = client.execute {
  indexInto("artists" / "modern")  //index : logtype
    .fields("size" -> 11)
    .refresh(RefreshPolicy.Immediate)
}.await
println(r)
//插入多个值 .fields("size" -> 12, "name" -> "w")
//传入Map,.fields(Map("size" -> 13, "name" -> "y"))

查询数据

val resp = client.execute {
  search("artists") bool should(
    matchQuery("size", 1), 
    matchQuery("size", 2)) sortByFieldDesc "size"
}.await
println(resp.result.hits.hits.length)
for (x <- resp.result.hits.hits) {
  println(x.sourceAsString)
}
//sort* :排序系列参数
//bool {must, must_not, should} :布尔 + 条件查询
//matchQuery("name", "y") :字段查询
//matchAllQuery() from 1 limit 5 :ALL查询 + 翻页功能
//termQuery("name", "y") :term查询
//prefix("name", "y") :前缀查询
//sourceInclude List("name", "size") :包含字段
//rawQuery :原始语句查询json
//query {existsQuery("name")} :exists查询
//query {multiMatchQuery("name", "w")} :multi_match查询
//query {rangeQuery("size").gt(5)} :range查询

聚合Aggs

val resp = client.execute {
  search("artists") aggs {
    cardinalityAgg("test", "name.keyword")
  }
}.await
println(resp.result.aggregations)
//cardinalityAgg("test", "name") :去重查询
//termsAgg("test", "size") :terms聚合
//valueCountAgg
//topHitsAgg
//sumAgg
//maxAgg
//minAgg

bulk操作

val resp = client.execute {
  bulk(
    indexInto("artists" / "modern").fields("size"->14, "name"->"a"),                 indexInto("artists" / "modern").fields("size"->15, "name"->"b"),
    indexInto("artists" / "modern").fields("size"->16, "name"->"c")
  ).refresh(RefreshPolicy.Immediate)
}.await
//

delete及update操作

delete("AWUTZfmlOa0Oz4s5Vwaf").from("artists" / "modern")
update("id").in("index"/"logtype").doc("name"->"w")
# 可以放入bulk操作中

Kafka

sbt配置

resolvers += Resolver.bintrayRepo("cakesolutions", "maven")
libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-clients" % "1.1.0",  //注意kafka版本要对应
  "net.cakesolutions" %% "scala-kafka-client" % "1.1.0"
)

生产者Producer

import cakesolutions.kafka.{KafkaProducer, KafkaProducerRecord}
import cakesolutions.kafka.KafkaProducer.Conf
import org.apache.kafka.common.serialization.StringSerializer

object kafka_client {
  def main(args: Array[String]): Unit = {
    val producer = KafkaProducer(
      Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = "192.168.55.237:9092")
    )
    println(producer)
    val record = KafkaProducerRecord("topic_test", Some("key"), "message")
    println(record)
    producer.send(record)
    producer.close()
  }
}

消费者Consumer

import java.util
import cakesolutions.kafka.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer

object kafka_client {
  def main(args: Array[String]): Unit = {
    val consumerConf = KafkaConsumer.Conf(
      keyDeserializer = new StringDeserializer,
      valueDeserializer = new StringDeserializer,
      bootstrapServers = "192.168.55.180:9092",
      groupId = "mta_data"
    )
    val consumer = KafkaConsumer(consumerConf)
    println(consumer)
    consumer.subscribe(util.Collections.singletonList("http"))
    //println(consumer.listTopics())
    while(true) {
      val records = consumer.poll(100)
      println(records)
      println(records.count())
      records.forEach(println)
    }
  }
}

Hbase

Hive

TiDB

@w4n9H w4n9H added the Java Java系,比如Scala,Clojure,Spark等 label Aug 10, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Java Java系,比如Scala,Clojure,Spark等
Projects
None yet
Development

No branches or pull requests

1 participant