-
Couldn't load subscription status.
- Fork 0
Sourcery refactored main branch #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,7 +44,7 @@ | |
| dbutils.fs.put(f"{api_resp_path}/station_info_{fmt_now}.json", "") | ||
| except: | ||
| print('File already exists') | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Found the following improvement in Lines |
||
| # Call API & get JSON response: | ||
| #---------------------------------------------------------- | ||
| resp = requests.get(station_information) | ||
|
|
@@ -56,6 +56,6 @@ | |
| resp_json_str = resp.content.decode("utf-8") | ||
| print("Byte size of JSON Response: ", len(resp_json_str)) | ||
| #---------------------------------------------------------- | ||
|
|
||
| with open(f"/dbfs/{api_resp_path}/station_info_{fmt_now}.json","w") as f: | ||
| f.write(resp_json_str) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,7 +47,7 @@ | |
| dbutils.fs.put(f"{api_resp_path}/station_status_{fmt_now}.json", "") | ||
| except: | ||
| print('File already exists') | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Found the following improvement in Lines |
||
| # Call API & get JSON response: | ||
| #---------------------------------------------------------- | ||
| resp = requests.get(station_status) | ||
|
|
@@ -59,6 +59,6 @@ | |
| resp_json_str = resp.content.decode("utf-8") | ||
| print("Byte size of JSON Response: ", len(resp_json_str)) | ||
| #---------------------------------------------------------- | ||
|
|
||
| with open(f"/dbfs/{api_resp_path}/station_status_{fmt_now}.json","w") as f: | ||
| f.write(resp_json_str) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,13 +64,13 @@ | |
| now = datetime.now() # current date and time | ||
| fmt_now = now.strftime("%Y%m%d_%H-%M-%S") | ||
| rec_cnt = rec_cnt + 1 # Increment counter | ||
|
|
||
| print('---------------------------------------------------') | ||
| print('Processing Record Number: ', rec_cnt) | ||
|
|
||
| # Define the full API call for current record in the DataFrame | ||
| full_url = url_part1 + str(row['lat']) + "&lon=" + str(row['lon']) + url_part2 + api_key | ||
|
|
||
|
Comment on lines
-67
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lines
|
||
| # Call API & get JSON response: | ||
| resp = requests.get(full_url) | ||
| if resp.status_code != 200: | ||
|
|
@@ -82,11 +82,12 @@ | |
| print("Byte size of JSON Response: ", len(resp_json_str)) | ||
|
|
||
| #Define full path + file name string | ||
| full_string = api_resp_path_weather + "/weather_info_" + fmt_now + "_station_" + row['station_id'] + ".json" | ||
| full_string = (f"{api_resp_path_weather}/weather_info_{fmt_now}_station_" + | ||
| row['station_id'] + ".json") | ||
|
|
||
| # Write to DBFS dir | ||
| dbutils.fs.put(full_string, resp_json_str, True) | ||
|
|
||
| # Wait so we limit to 1 API call per second | ||
| print(f"Sleeping for {sleep_time} seconds") | ||
| time.sleep(sleep_time) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,12 +10,11 @@ def create_api_client(profile=''): | |
| try: | ||
| if profile == '': | ||
| config = EnvironmentVariableConfigProvider().get_config() | ||
| elif profile == 'default': | ||
| config = ProfileConfigProvider().get_config() | ||
| else: | ||
| if profile == 'default': | ||
| config = ProfileConfigProvider().get_config() | ||
| else: | ||
| config = ProfileConfigProvider(profile).get_config() | ||
|
|
||
| config = ProfileConfigProvider(profile).get_config() | ||
|
|
||
|
Comment on lines
+13
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
| api_client = _get_api_client(config, command_name="blog-dms-cdc-demo") | ||
| except: | ||
| print("Failed to create api client.") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -270,7 +270,7 @@ | |
| }} | ||
| }}''' | ||
|
|
||
| import numpy as np | ||
| import numpy as np | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lines
|
||
| from datetime import datetime, timedelta | ||
| import math | ||
| import calendar | ||
|
|
@@ -282,7 +282,7 @@ | |
| for i in range(1, 200): | ||
| time.sleep(1) | ||
| print(resolved_list.keys()) | ||
| if str(i%10) in resolved_list.keys(): | ||
| if str(i % 10) in resolved_list: | ||
| continue | ||
| resolved = "N" | ||
| curr_entry = debezium_str.format('2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, 'null', str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, '2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, 'null', str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, "i") | ||
|
|
@@ -293,7 +293,11 @@ | |
| if resolved == "Y": | ||
| resolved_list[str(i%10)] = "Y" | ||
| curr_entry = debezium_str.format('2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, '2022-01-01', math.floor(math.log(i)+5), math.floor(math.log(i)), math.floor(math.log(i+10)), i, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, str(i%10), calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, calendar.timegm((datetime.now()+timedelta(days=i)).timetuple())*1000, "u") | ||
| dbutils.fs.put("/home/fs/banking_personalization/customer_patterns_{}.json".format(str(i)), curr_entry, True) | ||
| dbutils.fs.put( | ||
| f"/home/fs/banking_personalization/customer_patterns_{str(i)}.json", | ||
| curr_entry, | ||
| True, | ||
| ) | ||
|
|
||
| # COMMAND ---------- | ||
|
|
||
|
|
@@ -380,20 +384,17 @@ | |
|
|
||
| from datetime import datetime | ||
| curr_time = datetime.now() | ||
| import pandas as pd | ||
| import pandas as pd | ||
| import numpy as np | ||
| pdf = pd.DataFrame({datetime.now()}, columns=['updt_ts']) | ||
|
|
||
| debit_or_credit = np.random.uniform(0, 1, 1) | ||
| df = spark.range(10).withColumn("customer_id", col("id")%10).withColumn("scheduled_payment", col("id")%2).withColumn("txn_amount", col("id")%100).withColumn("debit_or_credit", when(col("customer_id") <= 3, round(rand())).otherwise(0)).drop("id").join(spark.createDataFrame(pdf)).withColumn("initial_balance", when(col("customer_id")%4 == 0, lit(150000)).otherwise(lit(10000))) | ||
|
|
||
| i = 0 | ||
| while i < 100: | ||
| for i in range(100): | ||
| time.sleep(1) | ||
| pdf = pd.DataFrame({datetime.now()+timedelta(days=i)}, columns=['updt_ts']) | ||
| df = df.union(spark.range(10).withColumn("id", col("id")+10*i).withColumn("customer_id", col("id")%10).withColumn("scheduled_payment", col("id")%2).withColumn("txn_amount", col("id")%100).withColumn("debit_or_credit", when(col("customer_id") <= 3, 1).otherwise(round(round(rand())))).drop("id").join(spark.createDataFrame(pdf)).withColumn("initial_balance", when(col("customer_id")%4 == 0, lit(150000)).otherwise(lit(10000)))) | ||
| i =i+1 | ||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| # DBTITLE 1,Stage Data in Delta Table | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,7 @@ | |
| targetName = "yourTargetDatabase" | ||
|
|
||
| # get the training set from the Loan Risk DLT pipeline | ||
| trainSql = "SELECT * FROM " + targetName + ".train_data" | ||
| trainSql = f"SELECT * FROM {targetName}.train_data" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lines
|
||
| train = spark.sql(trainSql) | ||
|
|
||
| display(train) | ||
|
|
@@ -20,9 +20,15 @@ | |
| numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs", "revol_util", "total_acc", "credit_length_in_years"] | ||
|
|
||
| # Establish stages for our GBT model | ||
| indexers = map(lambda c: StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid = 'keep'), categoricals) | ||
| indexers = map( | ||
| lambda c: StringIndexer( | ||
| inputCol=c, outputCol=f"{c}_idx", handleInvalid='keep' | ||
| ), | ||
| categoricals, | ||
| ) | ||
|
|
||
| imputers = Imputer(inputCols = numerics, outputCols = numerics) | ||
| featureCols = list(map(lambda c: c+"_idx", categoricals)) + numerics | ||
| featureCols = list(map(lambda c: f"{c}_idx", categoricals)) + numerics | ||
|
|
||
| # Define vector assemblers | ||
| model_matrix_stages = list(indexers) + [imputers] + \ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ | |
|
|
||
| # Fill in the pipelines_id, pipeline_name and storage_location | ||
| pipelines_id = "" #used to find the event path if storage location is blank | ||
| pipeline_name = "" | ||
| pipeline_name = "" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lines
|
||
| storage_location = "" #may be blank, in which case, the pipelines_id is used for the event path | ||
|
|
||
| from pyspark.sql.functions import * | ||
|
|
@@ -18,11 +18,8 @@ | |
|
|
||
| event_location = "" | ||
|
|
||
| if storage_location != "": | ||
| event_location = "dbfs:" + storage_location + "/system/events/" | ||
| else: | ||
| event_location = "dbfs:/pipelines/" + pipelines_id + "/system/events/" | ||
|
|
||
| event_location = (f"dbfs:{storage_location}/system/events/" if storage_location | ||
| else f"dbfs:/pipelines/{pipelines_id}/system/events/") | ||
| event_location | ||
|
|
||
| # COMMAND ---------- | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,11 +115,7 @@ def features(): | |
| def train_data(): | ||
| # Setup dataset | ||
| features = dlt.read("features").select(myX + [myY, "int_rate", "net", "issue_year"]) | ||
| train_data = features.filter(features.issue_year <= 2015) | ||
|
|
||
| return ( | ||
| train_data | ||
| ) | ||
| return features.filter(features.issue_year <= 2015) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
|
||
|
|
||
| # COMMAND ---------- | ||
|
|
@@ -133,8 +129,4 @@ def train_data(): | |
| def valid_data(): | ||
| # Setup dataset | ||
| features = dlt.read("features").select(myX + [myY, "int_rate", "net", "issue_year"]) | ||
| valid_data = features.filter(features.issue_year > 2015) | ||
|
|
||
| return ( | ||
| valid_data | ||
| ) | ||
| return features.filter(features.issue_year > 2015) | ||
|
Comment on lines
-136
to
+132
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,13 +66,19 @@ def sales_orders_cleaned(): | |
| } | ||
| ) | ||
| def sales_order_in_la(): | ||
| df = dlt.read_stream("sales_orders_cleaned").where("city == 'Los Angeles'") | ||
| df = dlt.read_stream("sales_orders_cleaned").where("city == 'Los Angeles'") | ||
| df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode")) | ||
|
|
||
| dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\ | ||
| .agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("qantity")) | ||
|
|
||
| return dfAgg | ||
| return df.groupBy( | ||
| df.order_date, | ||
| df.city, | ||
| df.customer_id, | ||
| df.customer_name, | ||
| df.ordered_products_explode.curr.alias("currency"), | ||
| ).agg( | ||
| sum(df.ordered_products_explode.price).alias("sales"), | ||
| sum(df.ordered_products_explode.qty).alias("qantity"), | ||
| ) | ||
|
Comment on lines
-69
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
|
|
||
|
|
||
| # COMMAND ---------- | ||
|
|
@@ -85,10 +91,16 @@ def sales_order_in_la(): | |
| } | ||
| ) | ||
| def sales_order_in_chicago(): | ||
| df = dlt.read_stream("sales_orders_cleaned").where("city == 'Chicago'") | ||
| df = dlt.read_stream("sales_orders_cleaned").where("city == 'Chicago'") | ||
| df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode")) | ||
|
|
||
| dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\ | ||
| .agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("qantity")) | ||
|
|
||
| return dfAgg | ||
| return df.groupBy( | ||
| df.order_date, | ||
| df.city, | ||
| df.customer_id, | ||
| df.customer_name, | ||
| df.ordered_products_explode.curr.alias("currency"), | ||
| ).agg( | ||
| sum(df.ordered_products_explode.price).alias("sales"), | ||
| sum(df.ordered_products_explode.qty).alias("qantity"), | ||
| ) | ||
|
Comment on lines
-88
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,13 +77,12 @@ def on_status(self, status): | |
|
|
||
| def write_file(self): | ||
| file_timestamp = calendar.timegm(time.gmtime()) | ||
| fname = self.filename + '/tweets_' + str(file_timestamp) + '.json' | ||
| fname = f'{self.filename}/tweets_{str(file_timestamp)}.json' | ||
|
|
||
|
|
||
| f = open(fname, 'w') | ||
| for tweet in self.tweet_stack: | ||
| f.write(jsonpickle.encode(tweet._json, unpicklable=False) + '\n') | ||
| f.close() | ||
| with open(fname, 'w') as f: | ||
| for tweet in self.tweet_stack: | ||
| f.write(jsonpickle.encode(tweet._json, unpicklable=False) + '\n') | ||
|
Comment on lines
-80
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function
|
||
| print("Wrote local file ", fname) | ||
|
|
||
| def on_error(self, status_code): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines
22-65refactored with the following changes:use-fstring-for-concatenation)