Skip to content

Commit

Permalink
finalized for initial publication
Browse files Browse the repository at this point in the history
  • Loading branch information
bryansmith-db committed Sep 3, 2023
1 parent 8c42242 commit 8f96e66
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 15 deletions.
2 changes: 1 addition & 1 deletion 02_Initial Workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
args.setNumPartitions( sc.defaultParallelism * 20 ) # default parallelism reflects databricks's cluster capacity

# define sample size
args.setLabelDataSampleSize(0.1)
args.setLabelDataSampleSize(0.05)

# COMMAND ----------

Expand Down
14 changes: 7 additions & 7 deletions 03_Incremental Workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@
# DBTITLE 1,Config Model Inputs
# configure priors Zingg input pipe
priors_inputPipe = Pipe(name='priors', format='delta')
priors_inputPipe.addProperty('path', prior_data_path)
priors_inputPipe.addProperty('path', prior_data_dir)
args.setData(priors_inputPipe)

# configure incoming Zingg input pipe
incoming_input_path = spark.sql("DESCRIBE DETAIL incremental").select('location').collect()[0]['location']
incoming_input_dir = spark.sql("DESCRIBE DETAIL incremental").select('location').collect()[0]['location']
incoming_inputPipe = Pipe(name='incoming', format='delta')
incoming_inputPipe.addProperty('path', incoming_input_path )
incoming_inputPipe.addProperty('path', incoming_input_dir )

# set input data pipelines
args.setData(priors_inputPipe, incoming_inputPipe)
Expand Down Expand Up @@ -186,7 +186,7 @@
spark
.read
.format('delta')
.load(LINKED_OUTPUT_DIR)
.load(linked_output_dir)
.selectExpr(
'z_score',
'z_cluster',
Expand Down Expand Up @@ -219,7 +219,7 @@
.join(
spark.table('cluster_members').alias('b'),
on=fn.expr("""
a.recid=COALESCE(b.recid,-1)
a.recid=COALESCE(b.recid,-1) AND
a.givenname=COALESCE(b.givenname,'') AND
a.surname=COALESCE(b.surname,'') AND
a.suburb=COALESCE(b.suburb,'') AND
Expand Down Expand Up @@ -311,9 +311,9 @@

# DBTITLE 1,Config Model Inputs
# configure incoming Zingg input pipe
incoming_input_path = spark.sql("DESCRIBE DETAIL incremental").select('location').collect()[0]['location']
incoming_input_dir = spark.sql("DESCRIBE DETAIL incremental").select('location').collect()[0]['location']
incoming_inputPipe = Pipe(name='incoming', format='delta')
incoming_inputPipe.addProperty('path', incoming_input_path )
incoming_inputPipe.addProperty('path', incoming_input_dir )

# set input data pipelines
args.setData(incoming_inputPipe)
Expand Down
12 changes: 5 additions & 7 deletions RUNME.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

# DBTITLE 0,Install util packages
# MAGIC %pip install git+https://github.com/databricks-academy/dbacademy@v1.0.13 git+https://github.com/databricks-industry-solutions/notebook-solution-companion@safe-print-html --quiet --disable-pip-version-check
# MAGIC dbutils.library.restartPython()

# COMMAND ----------

Expand Down Expand Up @@ -109,11 +110,11 @@
{
"job_cluster_key": "zingg_cluster",
"new_cluster": {
"spark_version": "12.2.x-cpu-ml-scala2.12",
"spark_version": "12.2.x-scala2.12",
"spark_conf": {
"spark.databricks.delta.formatCheck.enabled": "false"
},
"num_workers": 2,
"num_workers": 4,
"node_type_id": {"AWS": "i3.xlarge", "MSA": "Standard_DS3_v2", "GCP": "n1-highmem-4"}, # different from standard API
"custom_tags": {
"usage": "solacc_automation"
Expand All @@ -129,8 +130,5 @@

dbutils.widgets.dropdown("run_job", "False", ["True", "False"])
run_job = dbutils.widgets.get("run_job") == "True"
NotebookSolutionCompanion().deploy_compute(job_json, run_job=run_job)

# COMMAND ----------


nsc = NotebookSolutionCompanion()
nsc.deploy_compute(job_json, run_job=run_job)

0 comments on commit 8f96e66

Please sign in to comment.