Skip to content

Commit

Permalink
Delete topic from CLI. (#307)
Browse files Browse the repository at this point in the history
* Delete topic from CLI.

Closes #186. Inspired by #245

* Use FormParam instead of BothParam.

* Adds unit tests.
  • Loading branch information
plaflamme authored and steveniemitz committed Jun 14, 2017
1 parent 1576216 commit 70e91ee
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 2 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,26 @@ topic-expr examples:
t* - topics starting with 't'
```

Deleting Topics
--------------
Note: needs to be supported and enabled in the broker configuration (i.e.: `delete.topic.enable=true`).
```
#./kafka-mesos.sh help topic delete
Delete topics
Usage: topic delete [<topic-expr>]
Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000
topic-expr examples:
t0 - topic t0
t0,t1 - topics t0, t1
* - any topic
t* - topics starting with 't'
```

Listing topic partition details
-------------------------------
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class AdminUtils(zkUrl: String) extends AdminUtilsProxy {
configs: Properties
): Unit = KafkaAdminUtils.changeTopicConfig(zkUtils, topic, configs)

override def deleteTopic(topicToDelete: String): Unit =
KafkaAdminUtils.deleteTopic(zkUtils, topicToDelete)

override def fetchEntityConfig(
entityType: String,
entity: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class AdminUtils(zkUrl: String) extends AdminUtilsProxy {
configs: Properties
): Unit = KafkaAdminUtils.changeTopicConfig(zkClient, topic, configs)

override def deleteTopic(topicToDelete: String): Unit =
KafkaAdminUtils.deleteTopic(zkClient, topicToDelete)

override def fetchEntityConfig(
entityType: String,
entity: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class AdminUtils(zkUrl: String) extends AdminUtilsProxy {
configs: Properties
): Unit = KafkaAdminUtils.changeTopicConfig(zkUtils, topic, configs)

override def deleteTopic(topicToDelete: String): Unit =
KafkaAdminUtils.deleteTopic(zkUtils, topicToDelete)

override def fetchEntityConfig(
entityType: String,
entity: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ trait AdminUtilsProxy {

def changeTopicConfig(topic: String, configs: Properties)

def deleteTopic(topic: String)

def fetchEntityConfig(entityType: String, entity: String): Properties

def changeClientIdConfig(clientId: String, configs: Properties)
Expand Down
38 changes: 38 additions & 0 deletions src/scala/main/ly/stealth/mesos/kafka/cli/TopicCli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ trait TopicCli {
cmd match {
case "list" => handleList(arg, args)
case "add" | "update" => handleAddUpdate(arg, args, cmd == "add")
case "delete" => handleDelete(arg, args)
case "rebalance" => handleRebalance(arg, args)
case "realign" => handleRealign(arg, args)
case "partitions" => handlePartitions(arg)
Expand All @@ -72,6 +73,8 @@ trait TopicCli {
handleList(null, null, help = true)
case "add" | "update" =>
handleAddUpdate(null, null, cmd == "add", help = true)
case "delete" =>
handleDelete(null, null, help = true)
case "rebalance" =>
handleRebalance(null, null, help = true)
case "realign" =>
Expand Down Expand Up @@ -136,6 +139,40 @@ trait TopicCli {
}
}

def handleDelete(expr: String, args: Array[String], help: Boolean = false): Unit = {
val parser = newParser()

if (help) {
printLine("Delete topics\nUsage: topic delete [<topic-expr>]\n")
handleGenericOptions(null, help = true)

printLine()
printTopicExprExamples()

return
}

val params = new util.LinkedHashMap[String, String]
if (expr != null) params.put("topic", expr)

val json =
try {
sendRequestObj[ListTopicsResponse]("/topic/delete", params)
}
catch {
case e: IOException => throw new Error("" + e)
}

val topics = json.topics
val title = s"topic${if (topics.size > 1) "s" else ""} deleted:"
printLine(title)

topics.foreach { topic =>
printTopic(topic, 1)
printLine()
}
}

def handleAddUpdate(
name: String,
args: Array[String],
Expand Down Expand Up @@ -362,6 +399,7 @@ trait TopicCli {
printLine("Commands:")
printLine("list - list topics", 1)
printLine("add - add topic", 1)
printLine("delete - delete topic", 1)
printLine("update - update topic", 1)
printLine("rebalance - rebalance topics", 1)
printLine("realign - realign topics, keeping existing broker assignments where possible", 1)
Expand Down
6 changes: 6 additions & 0 deletions src/scala/main/ly/stealth/mesos/kafka/scheduler/Topics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class Topics {
AdminUtilsWrapper().assignReplicasToBrokers(brokers_, partitions, replicas, fixedStartIndex, startPartitionId)
}

def deleteTopic(name: String): Topic = {
val topicToDelete = getTopic(name)
if(topicToDelete != null) AdminUtilsWrapper().deleteTopic(topicToDelete.name)
topicToDelete
}

def addTopic(name: String, assignment: Map[Int, Seq[Int]] = null, options: Map[String, String] = null): Topic = {
val config = new Properties()
if (options != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package ly.stealth.mesos.kafka.scheduler.http.api

import java.lang.{Integer => JInt}
import javax.ws.rs.core.{MediaType, Response}
import javax.ws.rs.{DefaultValue, GET, POST, Path, PathParam, Produces}
import javax.ws.rs.{DefaultValue, FormParam, GET, POST, Path, PathParam, Produces}
import ly.stealth.mesos.kafka.scheduler.{Expr, Rebalancer}
import ly.stealth.mesos.kafka.scheduler.http.BothParam
import ly.stealth.mesos.kafka.{ListTopicsResponse, RebalanceStartResponse}
Expand Down Expand Up @@ -59,6 +59,27 @@ trait TopicApiComponentImpl extends TopicApiComponent {
@DefaultValue("*") @BothParam("topic") expr: String
) = list(expr)

@Path("delete")
@POST
@Produces(Array(MediaType.APPLICATION_JSON))
def deletePost(
@FormParam("topic") topicExpr: String
): Response = {
if (topicExpr == null)
return Status.BadRequest("topic required")

val names = Expr.expandTopics(topicExpr).toSet

val missing = names.filter(cluster.topics.getTopic(_) == null)
if(missing.nonEmpty) {
Status.BadRequest(s"topic${if (missing.size > 1) "s" else ""} not found ${missing.mkString(",")}")
} else {
Response.ok(ListTopicsResponse(
names.toSeq.sorted.map(cluster.topics.deleteTopic(_))
)).build()
}
}

@Path("{op: (add|update)}")
@POST
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down
31 changes: 31 additions & 0 deletions src/test/ly/stealth/mesos/kafka/CliTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,37 @@ class CliTest extends KafkaMesosTestCase {
assertOutNotContains("x")
}

@Test
def topic_delete {
registry.cluster.topics.addTopic("t0")
registry.cluster.topics.addTopic("t1")
registry.cluster.topics.addTopic("x")
registry.cluster.topics.addTopic("z")

// delete single topic
exec("topic delete z")
assertOutContains("topic deleted:")
assertOutContains("z")
assertOutNotContains("t0")
assertOutNotContains("t1")
assertOutNotContains("x")

// delete pattern
exec("topic delete t*")
assertOutContains("topics deleted:")
assertOutContains("t0")
assertOutContains("t1")
assertOutNotContains("x")

// delete is done asynchronously
delay("500ms") {
// delete all
exec("topic delete *")
assertOutContains("topic deleted:")
assertOutContains("x")
}
}

@Test
def topic_add {
exec("topic add t0")
Expand Down
17 changes: 16 additions & 1 deletion src/test/ly/stealth/mesos/kafka/HttpServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,22 @@ class HttpServerTest extends KafkaMesosTestCase {
assertEquals("t0", t0Node.name)
assertEquals(Seq(0), t0Node.partitions(0))
}


@Test
def topic_delete {
registry.cluster.topics.addTopic("t0")
registry.cluster.topics.addTopic("t1")

val json = sendRequestObj[ListTopicsResponse]("/topic/delete", parseMap("topic=t*"))
val topicNodes = json.topics
assertEquals(2, topicNodes.size)

val t0Node = topicNodes.head
val t1Node = topicNodes.tail.head
assertEquals("t0", t0Node.name)
assertEquals("t1", t1Node.name)
}

@Test
def topic_add {
val topics = registry.cluster.topics
Expand Down
18 changes: 18 additions & 0 deletions src/test/ly/stealth/mesos/kafka/TopicsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ class TopicsTest extends KafkaMesosTestCase {
assertEquals(2, topics.getTopics.size)
}

@Test
def deleteTopics {
assertEquals(0, topics.getTopics.size)

topics.addTopic("t0")
topics.addTopic("t1")

assertEquals(2, topics.getTopics.size)

topics.deleteTopic("t0")
topics.deleteTopic("t1")

// delete is done asynchronously
delay("500ms") {
assertEquals(0, topics.getTopics.size)
}
}

@Test
def fairAssignment {
val assignment = topics.fairAssignment(3, 2, Seq(0, 1, 2), 0, 0)
Expand Down

0 comments on commit 70e91ee

Please sign in to comment.