Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions change-data-capture-example/notebooks/1-CDC_DataGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

from pyspark.sql import functions as F
from faker import Faker
from collections import OrderedDict
from collections import OrderedDict
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 22-65 refactored with the following changes:

import uuid

folder = "/tmp/demo/cdc_raw"
#dbutils.fs.rm(folder, True)
try:
dbutils.fs.ls(folder)
except:
print("folder doesn't exists, generating the data...")
print("folder doesn't exists, generating the data...")
fake = Faker()
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
Expand All @@ -47,8 +47,9 @@
df = df.withColumn("operation", fake_operation())
df = df.withColumn("operation_date", fake_date())

df.repartition(100).write.format("json").mode("overwrite").save(folder+"/customers")

df.repartition(100).write.format("json").mode("overwrite").save(
f"{folder}/customers")

df = spark.range(0, 10000)
df = df.withColumn("id", fake_id())
df = df.withColumn("transaction_date", fake_date())
Expand All @@ -57,9 +58,15 @@
df = df.withColumn("operation", fake_operation())
df = df.withColumn("operation_date", fake_date())
#Join with the customer to get the same IDs generated.
df = df.withColumn("t_id", F.monotonically_increasing_id()).join(spark.read.json(folder+"/customers").select("id").withColumnRenamed("id", "customer_id").withColumn("t_id", F.monotonically_increasing_id()), "t_id").drop("t_id")
df.repartition(10).write.format("json").mode("overwrite").save(folder+"/transactions")
df = (df.withColumn("t_id", F.monotonically_increasing_id()).join(
spark.read.json(f"{folder}/customers").select("id").withColumnRenamed(
"id", "customer_id").withColumn("t_id",
F.monotonically_increasing_id()),
"t_id",
).drop("t_id"))
df.repartition(10).write.format("json").mode("overwrite").save(
f"{folder}/transactions")

# COMMAND ----------

spark.read.json(folder+"/customers").display()
spark.read.json(f"{folder}/customers").display()
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
dbutils.fs.put(f"{api_resp_path}/station_info_{fmt_now}.json", "")
except:
print('File already exists')

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the following improvement in Lines 47-59:

# Call API & get JSON response:
#----------------------------------------------------------
resp = requests.get(station_information)
Expand All @@ -56,6 +56,6 @@
resp_json_str = resp.content.decode("utf-8")
print("Byte size of JSON Response: ", len(resp_json_str))
#----------------------------------------------------------

with open(f"/dbfs/{api_resp_path}/station_info_{fmt_now}.json","w") as f:
f.write(resp_json_str)
4 changes: 2 additions & 2 deletions divvy-bike-demo/python-divvybike-api-ingest-stationstatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
dbutils.fs.put(f"{api_resp_path}/station_status_{fmt_now}.json", "")
except:
print('File already exists')

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the following improvement in Lines 50-62:

# Call API & get JSON response:
#----------------------------------------------------------
resp = requests.get(station_status)
Expand All @@ -59,6 +59,6 @@
resp_json_str = resp.content.decode("utf-8")
print("Byte size of JSON Response: ", len(resp_json_str))
#----------------------------------------------------------

with open(f"/dbfs/{api_resp_path}/station_status_{fmt_now}.json","w") as f:
f.write(resp_json_str)
11 changes: 6 additions & 5 deletions divvy-bike-demo/python-weatherinfo-api-ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@
now = datetime.now() # current date and time
fmt_now = now.strftime("%Y%m%d_%H-%M-%S")
rec_cnt = rec_cnt + 1 # Increment counter

print('---------------------------------------------------')
print('Processing Record Number: ', rec_cnt)

# Define the full API call for current record in the DataFrame
full_url = url_part1 + str(row['lat']) + "&lon=" + str(row['lon']) + url_part2 + api_key

Comment on lines -67 to +73
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 67-89 refactored with the following changes:

# Call API & get JSON response:
resp = requests.get(full_url)
if resp.status_code != 200:
Expand All @@ -82,11 +82,12 @@
print("Byte size of JSON Response: ", len(resp_json_str))

#Define full path + file name string
full_string = api_resp_path_weather + "/weather_info_" + fmt_now + "_station_" + row['station_id'] + ".json"
full_string = (f"{api_resp_path_weather}/weather_info_{fmt_now}_station_" +
row['station_id'] + ".json")

# Write to DBFS dir
dbutils.fs.put(full_string, resp_json_str, True)

# Wait so we limit to 1 API call per second
print(f"Sleeping for {sleep_time} seconds")
time.sleep(sleep_time)
9 changes: 4 additions & 5 deletions dms-dlt-cdc-demo/resources/utils/dlt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ def create_api_client(profile=''):
try:
if profile == '':
config = EnvironmentVariableConfigProvider().get_config()
elif profile == 'default':
config = ProfileConfigProvider().get_config()
else:
if profile == 'default':
config = ProfileConfigProvider().get_config()
else:
config = ProfileConfigProvider(profile).get_config()

config = ProfileConfigProvider(profile).get_config()

Comment on lines +13 to +17
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function create_api_client refactored with the following changes:

api_client = _get_api_client(config, command_name="blog-dms-cdc-demo")
except:
print("Failed to create api client.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@
}}
}}'''

import numpy as np
import numpy as np
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 273-396 refactored with the following changes:

from datetime import datetime, timedelta
import math
import calendar
Expand All @@ -282,7 +282,7 @@
for i in range(1, 200):
time.sleep(1)
print(resolved_list.keys())
if str(i%10) in resolved_list.keys():
if str(i % 10) in resolved_list:
continue
resolved = "N"
curr_entry = debezium_str.format('2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, 'null', str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, '2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, 'null', str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, "i")
Expand All @@ -293,7 +293,11 @@
if resolved == "Y":
resolved_list[str(i%10)] = "Y"
curr_entry = debezium_str.format('2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, '2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, "u")
dbutils.fs.put("/home/fs/banking_personalization/customer_patterns_{}.json".format(str(i)), curr_entry, True)
dbutils.fs.put(
f"/home/fs/banking_personalization/customer_patterns_{str(i)}.json",
curr_entry,
True,
)

# COMMAND ----------

Expand Down Expand Up @@ -380,20 +384,17 @@

from datetime import datetime
curr_time = datetime.now()
import pandas as pd
import pandas as pd
import numpy as np
pdf = pd.DataFrame({datetime.now()}, columns=['updt_ts'])

debit_or_credit = np.random.uniform(0, 1, 1)
df = spark.range(10).withColumn("customer_id", col("id")%10).withColumn("scheduled_payment", col("id")%2).withColumn("txn_amount", col("id")%100).withColumn("debit_or_credit", when(col("customer_id") <= 3, round(rand())).otherwise(0)).drop("id").join(spark.createDataFrame(pdf)).withColumn("initial_balance", when(col("customer_id")%4 == 0, lit(150000)).otherwise(lit(10000)))

i = 0
while i < 100:
for i in range(100):
time.sleep(1)
pdf = pd.DataFrame({datetime.now()+timedelta(days=i)}, columns=['updt_ts'])
df = df.union(spark.range(10).withColumn("id", col("id")+10*i).withColumn("customer_id", col("id")%10).withColumn("scheduled_payment", col("id")%2).withColumn("txn_amount", col("id")%100).withColumn("debit_or_credit", when(col("customer_id") <= 3, 1).otherwise(round(round(rand())))).drop("id").join(spark.createDataFrame(pdf)).withColumn("initial_balance", when(col("customer_id")%4 == 0, lit(150000)).otherwise(lit(10000))))
i =i+1

# COMMAND ----------

# DBTITLE 1,Stage Data in Delta Table
Expand Down
12 changes: 9 additions & 3 deletions ml models/loan risk ml model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
targetName = "yourTargetDatabase"

# get the training set from the Loan Risk DLT pipeline
trainSql = "SELECT * FROM " + targetName + ".train_data"
trainSql = f"SELECT * FROM {targetName}.train_data"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 6-25 refactored with the following changes:

train = spark.sql(trainSql)

display(train)
Expand All @@ -20,9 +20,15 @@
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs", "revol_util", "total_acc", "credit_length_in_years"]

# Establish stages for our GBT model
indexers = map(lambda c: StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid = 'keep'), categoricals)
indexers = map(
lambda c: StringIndexer(
inputCol=c, outputCol=f"{c}_idx", handleInvalid='keep'
),
categoricals,
)

imputers = Imputer(inputCols = numerics, outputCols = numerics)
featureCols = list(map(lambda c: c+"_idx", categoricals)) + numerics
featureCols = list(map(lambda c: f"{c}_idx", categoricals)) + numerics

# Define vector assemblers
model_matrix_stages = list(indexers) + [imputers] + \
Expand Down
9 changes: 3 additions & 6 deletions python/DLT Event Log Queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# Fill in the pipelines_id, pipeline_name and storage_location
pipelines_id = "" #used to find the event path if storage location is blank
pipeline_name = ""
pipeline_name = ""
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 9-25 refactored with the following changes:

storage_location = "" #may be blank, in which case, the pipelines_id is used for the event path

from pyspark.sql.functions import *
Expand All @@ -18,11 +18,8 @@

event_location = ""

if storage_location != "":
event_location = "dbfs:" + storage_location + "/system/events/"
else:
event_location = "dbfs:/pipelines/" + pipelines_id + "/system/events/"

event_location = (f"dbfs:{storage_location}/system/events/" if storage_location
else f"dbfs:/pipelines/{pipelines_id}/system/events/")
event_location

# COMMAND ----------
Expand Down
12 changes: 2 additions & 10 deletions python/Loan Risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@ def features():
def train_data():
# Setup dataset
features = dlt.read("features").select(myX + [myY, "int_rate", "net", "issue_year"])
train_data = features.filter(features.issue_year <= 2015)

return (
train_data
)
return features.filter(features.issue_year <= 2015)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function train_data refactored with the following changes:



# COMMAND ----------
Expand All @@ -133,8 +129,4 @@ def train_data():
def valid_data():
# Setup dataset
features = dlt.read("features").select(myX + [myY, "int_rate", "net", "issue_year"])
valid_data = features.filter(features.issue_year > 2015)

return (
valid_data
)
return features.filter(features.issue_year > 2015)
Comment on lines -136 to +132
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function valid_data refactored with the following changes:

32 changes: 22 additions & 10 deletions python/Retail Sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,19 @@ def sales_orders_cleaned():
}
)
def sales_order_in_la():
df = dlt.read_stream("sales_orders_cleaned").where("city == 'Los Angeles'")
df = dlt.read_stream("sales_orders_cleaned").where("city == 'Los Angeles'")
df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode"))

dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\
.agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("qantity"))

return dfAgg
return df.groupBy(
df.order_date,
df.city,
df.customer_id,
df.customer_name,
df.ordered_products_explode.curr.alias("currency"),
).agg(
sum(df.ordered_products_explode.price).alias("sales"),
sum(df.ordered_products_explode.qty).alias("qantity"),
)
Comment on lines -69 to +81
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function sales_order_in_la refactored with the following changes:



# COMMAND ----------
Expand All @@ -85,10 +91,16 @@ def sales_order_in_la():
}
)
def sales_order_in_chicago():
df = dlt.read_stream("sales_orders_cleaned").where("city == 'Chicago'")
df = dlt.read_stream("sales_orders_cleaned").where("city == 'Chicago'")
df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode"))

dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\
.agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("qantity"))

return dfAgg
return df.groupBy(
df.order_date,
df.city,
df.customer_id,
df.customer_name,
df.ordered_products_explode.curr.alias("currency"),
).agg(
sum(df.ordered_products_explode.price).alias("sales"),
sum(df.ordered_products_explode.qty).alias("qantity"),
)
Comment on lines -88 to +106
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function sales_order_in_chicago refactored with the following changes:

9 changes: 4 additions & 5 deletions twitter-dlt-huggingface-demo/Twitter-Stream-S3.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,12 @@ def on_status(self, status):

def write_file(self):
file_timestamp = calendar.timegm(time.gmtime())
fname = self.filename + '/tweets_' + str(file_timestamp) + '.json'
fname = f'{self.filename}/tweets_{str(file_timestamp)}.json'


f = open(fname, 'w')
for tweet in self.tweet_stack:
f.write(jsonpickle.encode(tweet._json, unpicklable=False) + '\n')
f.close()
with open(fname, 'w') as f:
for tweet in self.tweet_stack:
f.write(jsonpickle.encode(tweet._json, unpicklable=False) + '\n')
Comment on lines -80 to +85
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function TweetStream.write_file refactored with the following changes:

print("Wrote local file ", fname)

def on_error(self, status_code):
Expand Down