Skip to content

Dev 102690 cdc resource #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 20, 2025
Merged

Conversation

yoavshemesh-riskified
Copy link

No description provided.

@yoavshemesh-riskified yoavshemesh-riskified requested a review from a team as a code owner March 18, 2025 15:38
CDCKafkaConnectionName: {
Type: schema.TypeString,
Required: true,
Description: "kafka user name",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka external connection name?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too long

Type: schema.TypeString,
Required: true,
ValidateFunc: validation.StringIsNotEmpty,
Description: "schema registry url",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema registry external connection name

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? too long

startFrom := d.Get(CDCStartFrom).(string)

database := db.client.databaseName
txn, err := startTransaction(db.client, database)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the linter said. If you don't need the return value you can assign
_ , err := start...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it is used.
I will push a fix

}

var initialScanClause string
if d.Get(CDCInitialScan).(string) == "yes" || d.Get(CDCInitialScan).(string) == "true" {
Copy link

@harelsafra harelsafra Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to support "only"? or will these kind of jobs be cleaned up later and cause state drift?

And I think it's better to use just "yes" and not also "true". It can cause errors down the line if the user sets "true" and the read returns "yes" - TF will try to change it all the time

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing is that if anyone passes true, which is reasonably more intuitive than Yes, they get No.
I might need to check it in at validator phase maybe

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a validator for "yes","no"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then delete
|| d.Get(CDCInitialScan).(string) == "true"
for clarity please

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

`CREATE CHANGEFEED FOR TABLE %v INTO "external://%s" WITH %s updated, %s diff, on_error='pause', format = avro, avro_schema_prefix='%s_', confluent_schema_registry = 'external://%s'`,
tableListStr, kafkaConnectionName, initialScanClause, cursorClause, avroSchemaPrefix, registryConnectionName,
)
txn, err = startTransaction(db.client, database)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add check err return

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


// Setting the table list
currentTableListInterface := d.Get(CDCtableList)
if len(currentTableListInterface.([]interface{})) == 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section isn't really needed. Read should just read the infra state and return it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not true,
When importing the tableList is empty. Had this in debug, in this case it gets the list from the DB.
The import is only with jobID, all other values are from the DB read

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. read needs to read from the database (infra in TF speak). It doesn't need to look at the current state at all - just bring back what's in the database

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok then

return err
}
defer deferredRollback(txn)
txn.Exec(fmt.Sprintf("CANCEL JOB %s", d.Id()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the linter said. Need to assign the err return value before checking it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err = txn.Commit(); err != nil {
return fmt.Errorf("could not commit transaction: %w", err)
}
waitForJobStatus(db, jobID, "PAUSED")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what the linter said

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tov

query := fmt.Sprintf("SELECT status FROM [SHOW JOB %s]", jobID)
err = txn.QueryRow(query).Scan(&status)
if err != nil {
txn.Rollback()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what the linter said

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

if strings.ToUpper(status) == strings.ToUpper(requestedStatus) {
txn.Commit()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return nil
}

txn.Commit()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return fmt.Errorf("could not commit transaction: %w", err)
}
d.SetId(connName)
d.Set(ConnUrl, strings.TrimSpace(connUrl))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may cause drift errors down the line. The .tf file will have one thing but the state another one

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return fmt.Errorf("Error starting transaction: %w", err)
}
var connUrl string
if err := txn.QueryRow("select connection_uri from [show external connections] where connection_name= $1", connName).Scan(&connUrl); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "Show external connection $1" ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if err := txn.QueryRow("select connection_uri from [show external connections] where connection_name= $1", connName).Scan(&connUrl); err != nil {
return fmt.Errorf("Error reading EXTERNAL CONNECTION: %w", err)
}
// d.Set(ConnUrl, strings.TrimSpace(connUrl))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the URL commented?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed.
if a url is changed then you should drop-create it. there isnt an ALTER

@yoavshemesh-riskified yoavshemesh-riskified merged commit 907b4f5 into master Mar 20, 2025
7 of 9 checks passed
@yoavshemesh-riskified yoavshemesh-riskified deleted the DEV-102690-cdc-resource branch March 20, 2025 13:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants