Skip to content

Commit 76897d0

Browse files
committed
feat(databricks-jdbc-driver): Implement connection checking without waking up SQL warehouse
1 parent 940c30f commit 76897d0

File tree

2 files changed

+71
-28
lines changed

2 files changed

+71
-28
lines changed

packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts

+40-28
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* @fileoverview The `DatabricksDriver` and related types declaration.
55
*/
66

7+
import fetch from 'node-fetch';
78
import { assertDataSource, getEnv, } from '@cubejs-backend/shared';
89
import {
910
DatabaseStructure,
@@ -20,6 +21,7 @@ import { JDBCDriver, JDBCDriverConfiguration, } from '@cubejs-backend/jdbc-drive
2021
import { DatabricksQuery } from './DatabricksQuery';
2122
import {
2223
extractAndRemoveUidPwdFromJdbcUrl,
24+
parseDatabricksJdbcUrl,
2325
resolveJDBCDriver
2426
} from './helpers';
2527

@@ -124,6 +126,11 @@ type ColumnInfo = {
124126
type: GenericDataBaseType;
125127
};
126128

129+
export type ParsedConnectionProperties = {
130+
host: string,
131+
warehouseId: string,
132+
};
133+
127134
const DatabricksToGenericType: Record<string, string> = {
128135
binary: 'hll_datasketches',
129136
'decimal(10,0)': 'bigint',
@@ -143,6 +150,8 @@ export class DatabricksDriver extends JDBCDriver {
143150
*/
144151
protected readonly config: DatabricksDriverConfiguration;
145152

153+
private readonly parsedConnectionProperties: ParsedConnectionProperties;
154+
146155
public static dialectClass() {
147156
return DatabricksQuery;
148157
}
@@ -262,38 +271,50 @@ export class DatabricksDriver extends JDBCDriver {
262271

263272
super(config);
264273
this.config = config;
274+
this.parsedConnectionProperties = parseDatabricksJdbcUrl(url);
265275
this.showSparkProtocolWarn = showSparkProtocolWarn;
266276
}
267277

268-
/**
269-
* @override
270-
*/
271-
public readOnly() {
278+
public override readOnly() {
272279
return !!this.config.readOnly;
273280
}
274281

275-
/**
276-
* @override
277-
*/
278-
public capabilities(): DriverCapabilities {
282+
public override capabilities(): DriverCapabilities {
279283
return {
280284
unloadWithoutTempTable: true,
281285
incrementalSchemaLoading: true
282286
};
283287
}
284288

285-
/**
286-
* @override
287-
*/
288-
public setLogger(logger: any) {
289+
public override setLogger(logger: any) {
289290
super.setLogger(logger);
290291
this.showDeprecations();
291292
}
292293

293-
/**
294-
* @override
295-
*/
296-
public async loadPreAggregationIntoTable(
294+
public override async testConnection() {
295+
const token = `Bearer ${this.config.properties.PWD}`;
296+
297+
const res = await fetch(`https://${this.parsedConnectionProperties.host}/api/2.0/sql/warehouses/${this.parsedConnectionProperties.warehouseId}`, {
298+
headers: { Authorization: token },
299+
});
300+
301+
if (!res.ok) {
302+
throw new Error(`Databricks API error: ${res.statusText}`);
303+
}
304+
305+
const data = await res.json();
306+
307+
if (['DELETING', 'DELETED'].includes(data.state)) {
308+
throw new Error(`Warehouse is being deleted (current state: ${data.state})`);
309+
}
310+
311+
// There is also DEGRADED status, but it doesn't mean that cluster is 100% not working...
312+
if (data.health?.status === 'FAILED') {
313+
throw new Error(`Warehouse is unhealthy: ${data.health?.summary}. Details: ${data.health?.details}`);
314+
}
315+
}
316+
317+
public override async loadPreAggregationIntoTable(
297318
preAggregationTableName: string,
298319
loadSql: string,
299320
params: unknown[],
@@ -320,10 +341,7 @@ export class DatabricksDriver extends JDBCDriver {
320341
}
321342
}
322343

323-
/**
324-
* @override
325-
*/
326-
public async query<R = unknown>(
344+
public override async query<R = unknown>(
327345
query: string,
328346
values: unknown[],
329347
): Promise<R[]> {
@@ -357,10 +375,7 @@ export class DatabricksDriver extends JDBCDriver {
357375
}
358376
}
359377

360-
/**
361-
* @override
362-
*/
363-
public dropTable(tableName: string, options?: QueryOptions): Promise<unknown> {
378+
public override dropTable(tableName: string, options?: QueryOptions): Promise<unknown> {
364379
const tableFullName = `${
365380
this.config?.catalog ? `${this.config.catalog}.` : ''
366381
}${tableName}`;
@@ -392,10 +407,7 @@ export class DatabricksDriver extends JDBCDriver {
392407
}
393408
}
394409

395-
/**
396-
* @override
397-
*/
398-
protected async getCustomClassPath() {
410+
protected override async getCustomClassPath() {
399411
return resolveJDBCDriver();
400412
}
401413

packages/cubejs-databricks-jdbc-driver/src/helpers.ts

+31
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import fs from 'fs';
22
import path from 'path';
33

44
import { downloadJDBCDriver, OSS_DRIVER_VERSION } from './installer';
5+
import type { ParsedConnectionProperties } from './DatabricksDriver';
56

67
async function fileExistsOr(
78
fsPath: string,
@@ -51,3 +52,33 @@ export function extractAndRemoveUidPwdFromJdbcUrl(jdbcUrl: string): [uid: string
5152

5253
return [uid, pwd, cleanedUrl];
5354
}
55+
56+
export function parseDatabricksJdbcUrl(jdbcUrl: string): ParsedConnectionProperties {
57+
const jdbcPrefix = 'jdbc:databricks://';
58+
const urlWithoutPrefix = jdbcUrl.slice(jdbcPrefix.length);
59+
60+
const [hostPortAndPath, ...params] = urlWithoutPrefix.split(';');
61+
const [host] = hostPortAndPath.split(':');
62+
63+
const paramMap = new Map<string, string>();
64+
for (const param of params) {
65+
const [key, value] = param.split('=');
66+
if (key && value) {
67+
paramMap.set(key, value);
68+
}
69+
}
70+
71+
const httpPath = paramMap.get('httpPath');
72+
if (!httpPath) {
73+
throw new Error('Missing httpPath in JDBC URL');
74+
}
75+
76+
const warehouseMatch = httpPath.match(/\/warehouses\/([a-zA-Z0-9]+)/);
77+
if (!warehouseMatch) {
78+
throw new Error('Could not extract warehouseId from httpPath');
79+
}
80+
81+
const warehouseId = warehouseMatch[1];
82+
83+
return { host, warehouseId };
84+
}

0 commit comments

Comments
 (0)