-
Notifications
You must be signed in to change notification settings - Fork 48
[3/7] Telemetry Client Management: TelemetryClient and Provider #326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
db640fa
211f91c
c3779af
a105777
1cdb716
8721993
fd10a69
dd2bac8
4437ae9
e9c3138
32003e9
689e561
4df6ce0
2d41e2d
589e062
e474256
d7d2cec
a3c9042
5b47e7e
6110797
7c5c16c
f9bd330
42540bc
99eb2ab
11590f3
bda2cac
fce4499
de17f1b
363e2fc
38e9d03
e81ad15
d009957
990ec15
2749751
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ node_modules | |
| .nyc_output | ||
| coverage_e2e | ||
| coverage_unit | ||
| coverage | ||
| .clinic | ||
|
|
||
| dist | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| /** | ||
| * Copyright (c) 2025 Databricks Contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import IClientContext from '../contracts/IClientContext'; | ||
| import { LogLevel } from '../contracts/IDBSQLLogger'; | ||
|
|
||
| /** | ||
| * Telemetry client for a specific host. | ||
| * Managed by TelemetryClientProvider with reference counting. | ||
| * One client instance is shared across all connections to the same host. | ||
| */ | ||
| class TelemetryClient { | ||
| private closed: boolean = false; | ||
|
|
||
| constructor(private context: IClientContext, private host: string) { | ||
| const logger = context.getLogger(); | ||
| logger.log(LogLevel.debug, `Created TelemetryClient for host: ${host}`); | ||
| } | ||
|
|
||
| /** | ||
| * Gets the host associated with this client. | ||
| */ | ||
| getHost(): string { | ||
| return this.host; | ||
| } | ||
|
|
||
| /** | ||
| * Checks if the client has been closed. | ||
| */ | ||
| isClosed(): boolean { | ||
| return this.closed; | ||
| } | ||
|
|
||
| /** | ||
| * Closes the telemetry client and releases resources. | ||
| * Should only be called by TelemetryClientProvider when reference count reaches zero. | ||
| */ | ||
| close(): void { | ||
| if (this.closed) { | ||
| return; | ||
| } | ||
| try { | ||
| this.context.getLogger().log(LogLevel.debug, `Closing TelemetryClient for host: ${this.host}`); | ||
| } catch { | ||
| // swallow | ||
| } finally { | ||
| this.closed = true; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| export default TelemetryClient; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| /** | ||
| * Copyright (c) 2025 Databricks Contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import IClientContext from '../contracts/IClientContext'; | ||
| import { LogLevel } from '../contracts/IDBSQLLogger'; | ||
| import TelemetryClient from './TelemetryClient'; | ||
|
|
||
| /** | ||
| * Holds a telemetry client and its reference count. | ||
| * The reference count tracks how many connections are using this client. | ||
| */ | ||
| interface TelemetryClientHolder { | ||
| client: TelemetryClient; | ||
| refCount: number; | ||
| } | ||
|
|
||
| /** | ||
| * Manages one telemetry client per host. | ||
| * Prevents rate limiting by sharing clients across connections to the same host. | ||
| * Instance-based (not singleton), stored in DBSQLClient. | ||
| * | ||
| * Reference counts are incremented synchronously so there are no async races | ||
| * on the count itself. The map entry is deleted before awaiting close() so a | ||
| * concurrent getOrCreateClient call always gets a fresh instance. | ||
| */ | ||
| class TelemetryClientProvider { | ||
| private clients: Map<string, TelemetryClientHolder>; | ||
|
|
||
| constructor(private context: IClientContext) { | ||
| this.clients = new Map(); | ||
| const logger = context.getLogger(); | ||
| logger.log(LogLevel.debug, 'Created TelemetryClientProvider'); | ||
| } | ||
|
|
||
| /** | ||
| * Gets or creates a telemetry client for the specified host. | ||
| * Increments the reference count for the client. | ||
| * | ||
| * @param host The host identifier (e.g., "workspace.cloud.databricks.com") | ||
| * @returns The telemetry client for the host | ||
| */ | ||
| getOrCreateClient(host: string): TelemetryClient { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [F6] Host used as map key without normalization — alias confusion + unbounded growth — Severity: Medium
No size cap either — no upper bound on how big Fix:
Posted by code-review-squad • flagged by security, ops. |
||
| const logger = this.context.getLogger(); | ||
| let holder = this.clients.get(host); | ||
|
|
||
| if (!holder) { | ||
| // Create new client for this host | ||
| const client = new TelemetryClient(this.context, host); | ||
| holder = { | ||
| client, | ||
| refCount: 0, | ||
| }; | ||
| this.clients.set(host, holder); | ||
| logger.log(LogLevel.debug, `Created new TelemetryClient for host: ${host}`); | ||
| } | ||
|
|
||
| // Increment reference count | ||
| holder.refCount += 1; | ||
| logger.log(LogLevel.debug, `TelemetryClient reference count for ${host}: ${holder.refCount}`); | ||
|
|
||
| return holder.client; | ||
| } | ||
|
|
||
| /** | ||
| * Releases a telemetry client for the specified host. | ||
| * Decrements the reference count and closes the client when it reaches zero. | ||
| * | ||
| * @param host The host identifier | ||
| */ | ||
| releaseClient(host: string): void { | ||
| const logger = this.context.getLogger(); | ||
| const holder = this.clients.get(host); | ||
|
|
||
| if (!holder) { | ||
| logger.log(LogLevel.debug, `No TelemetryClient found for host: ${host}`); | ||
| return; | ||
| } | ||
|
|
||
| // Decrement reference count | ||
| holder.refCount -= 1; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [F5] Refcount underflow is silent; double-release swaps a live client — Severity: High
Also: header comment at lines 35-37 says "The map entry is deleted before awaiting close() so a concurrent getOrCreateClient call always gets a fresh instance." — but Fix: if (holder.refCount <= 0) {
logger.log(LogLevel.warn, `Unbalanced release for ${host}`);
return;
}
holder.refCount -= 1;And either make Add tests:
Posted by code-review-squad • flagged by devils-advocate, ops, security, architecture, language, test. |
||
| logger.log(LogLevel.debug, `TelemetryClient reference count for ${host}: ${holder.refCount}`); | ||
|
|
||
| // Close and remove client when reference count reaches zero. | ||
| // Delete from map before awaiting close so a concurrent getOrCreateClient | ||
| // creates a fresh client rather than receiving this closing one. | ||
| if (holder.refCount <= 0) { | ||
| this.clients.delete(host); | ||
| try { | ||
| holder.client.close(); | ||
| logger.log(LogLevel.debug, `Closed and removed TelemetryClient for host: ${host}`); | ||
| } catch (error: any) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [F9]
Also: Fix: } catch (error) {
const msg = error instanceof Error ? error.message : String(error);
logger.log(LogLevel.debug, `Error releasing TelemetryClient: ${msg}`);
}Same fix applies to Posted by code-review-squad • flagged by devils-advocate, security, language. |
||
| // Swallow all exceptions per requirement | ||
| logger.log(LogLevel.debug, `Error releasing TelemetryClient: ${error.message}`); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @internal Exposed for testing only. | ||
| */ | ||
| getRefCount(host: string): number { | ||
| const holder = this.clients.get(host); | ||
| return holder ? holder.refCount : 0; | ||
| } | ||
|
|
||
| /** | ||
| * @internal Exposed for testing only. | ||
| */ | ||
| getActiveClients(): Map<string, TelemetryClient> { | ||
| const result = new Map<string, TelemetryClient>(); | ||
| for (const [host, holder] of this.clients.entries()) { | ||
| result.set(host, holder.client); | ||
| } | ||
| return result; | ||
| } | ||
| } | ||
|
|
||
| export default TelemetryClientProvider; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| /** | ||
| * Copyright (c) 2025 Databricks Contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| /** | ||
| * Build full URL from host and path, handling protocol correctly. | ||
| * @param host The hostname (with or without protocol) | ||
| * @param path The path to append (should start with /) | ||
| * @returns Full URL with protocol | ||
| */ | ||
| // eslint-disable-next-line import/prefer-default-export | ||
| export function buildUrl(host: string, path: string): string { | ||
| // Check if host already has protocol | ||
| if (host.startsWith('http://') || host.startsWith('https://')) { | ||
| return `${host}${path}`; | ||
| } | ||
| // Add https:// if no protocol present | ||
| return `https://${host}${path}`; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| /** | ||
| * Copyright (c) 2025 Databricks Contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import { expect } from 'chai'; | ||
| import sinon from 'sinon'; | ||
| import TelemetryClient from '../../../lib/telemetry/TelemetryClient'; | ||
| import ClientContextStub from '../.stubs/ClientContextStub'; | ||
| import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; | ||
|
|
||
| describe('TelemetryClient', () => { | ||
| const HOST = 'workspace.cloud.databricks.com'; | ||
|
|
||
| describe('Constructor', () => { | ||
| it('should create client with host', () => { | ||
| const context = new ClientContextStub(); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| expect(client.getHost()).to.equal(HOST); | ||
| expect(client.isClosed()).to.be.false; | ||
| }); | ||
|
|
||
| it('should log creation at debug level', () => { | ||
| const context = new ClientContextStub(); | ||
| const logSpy = sinon.spy(context.logger, 'log'); | ||
|
|
||
| new TelemetryClient(context, HOST); | ||
|
|
||
| expect(logSpy.calledWith(LogLevel.debug, `Created TelemetryClient for host: ${HOST}`)).to.be.true; | ||
| }); | ||
| }); | ||
|
|
||
| describe('getHost', () => { | ||
| it('should return the host identifier', () => { | ||
| const context = new ClientContextStub(); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| expect(client.getHost()).to.equal(HOST); | ||
| }); | ||
| }); | ||
|
|
||
| describe('isClosed', () => { | ||
| it('should return false initially', () => { | ||
| const context = new ClientContextStub(); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| expect(client.isClosed()).to.be.false; | ||
| }); | ||
|
|
||
| it('should return true after close', async () => { | ||
| const context = new ClientContextStub(); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| client.close(); | ||
|
|
||
| expect(client.isClosed()).to.be.true; | ||
| }); | ||
| }); | ||
|
|
||
| describe('close', () => { | ||
| it('should set closed flag', async () => { | ||
| const context = new ClientContextStub(); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| client.close(); | ||
|
|
||
| expect(client.isClosed()).to.be.true; | ||
| }); | ||
|
|
||
| it('should log closure at debug level', async () => { | ||
| const context = new ClientContextStub(); | ||
| const logSpy = sinon.spy(context.logger, 'log'); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| client.close(); | ||
|
|
||
| expect(logSpy.calledWith(LogLevel.debug, `Closing TelemetryClient for host: ${HOST}`)).to.be.true; | ||
| }); | ||
|
|
||
| it('should be idempotent', async () => { | ||
| const context = new ClientContextStub(); | ||
| const logSpy = sinon.spy(context.logger, 'log'); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| client.close(); | ||
| const firstCallCount = logSpy.callCount; | ||
|
|
||
| client.close(); | ||
|
|
||
| // Should not log again on second close | ||
| expect(logSpy.callCount).to.equal(firstCallCount); | ||
| expect(client.isClosed()).to.be.true; | ||
| }); | ||
|
|
||
| it('should swallow all exceptions', async () => { | ||
| const context = new ClientContextStub(); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| // Force an error by stubbing the logger | ||
| const error = new Error('Logger error'); | ||
| sinon.stub(context.logger, 'log').throws(error); | ||
|
|
||
| // Should not throw | ||
| client.close(); | ||
| // If we get here without throwing, the test passes | ||
| expect(true).to.be.true; | ||
| }); | ||
|
|
||
| it('should still set closed when logger throws', () => { | ||
| const context = new ClientContextStub(); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| sinon.stub(context.logger, 'log').throws(new Error('Logger error')); | ||
|
|
||
| client.close(); | ||
|
|
||
| expect(client.isClosed()).to.be.true; | ||
| }); | ||
| }); | ||
|
|
||
| describe('Context usage', () => { | ||
| it('should use logger from context', () => { | ||
| const context = new ClientContextStub(); | ||
| const logSpy = sinon.spy(context.logger, 'log'); | ||
|
|
||
| new TelemetryClient(context, HOST); | ||
|
|
||
| expect(logSpy.called).to.be.true; | ||
| }); | ||
|
|
||
| it('should log all messages at debug level only', async () => { | ||
| const context = new ClientContextStub(); | ||
| const logSpy = sinon.spy(context.logger, 'log'); | ||
| const client = new TelemetryClient(context, HOST); | ||
|
|
||
| client.close(); | ||
|
|
||
| logSpy.getCalls().forEach((call) => { | ||
| expect(call.args[0]).to.equal(LogLevel.debug); | ||
| }); | ||
| }); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[F10]
"deleted before awaiting close()"comment is false —close()is sync — Severity: MediumThe header comment at lines 35-37 says:
And the inline comment at 96-98 says:
Both describe a concurrency model the code isn't implementing:
TelemetryClient.close()is sync (TelemetryClient.ts:51), andholder.client.close()is called synchronously right afterthis.clients.delete(host). Nothing is awaited. Whenclose()becomes async in [5/7] (HTTP flush / abort / drain), the code will still not await — and the race the comment claims is handled will actually materialize: newgetOrCreateClientreturns a fresh instance while the old one's async close is still flushing.Fix: Either
close()returnPromise<void>now; changereleaseClienttoasync;await holder.client.close()after the map delete. Callers (including [5/7]) can be written to the final shape today.close()becomes async.Posted by code-review-squad • flagged by devils-advocate, architecture, language, maintainability.