-
Notifications
You must be signed in to change notification settings - Fork 0
Dev 102690 cdc resource #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
CDCKafkaConnectionName: { | ||
Type: schema.TypeString, | ||
Required: true, | ||
Description: "kafka user name", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka external connection name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too long
Type: schema.TypeString, | ||
Required: true, | ||
ValidateFunc: validation.StringIsNotEmpty, | ||
Description: "schema registry url", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema registry external connection name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? too long
startFrom := d.Get(CDCStartFrom).(string) | ||
|
||
database := db.client.databaseName | ||
txn, err := startTransaction(db.client, database) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the linter said. If you don't need the return value you can assign
_ , err := start...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it is used.
I will push a fix
} | ||
|
||
var initialScanClause string | ||
if d.Get(CDCInitialScan).(string) == "yes" || d.Get(CDCInitialScan).(string) == "true" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to support "only"? or will these kind of jobs be cleaned up later and cause state drift?
And I think it's better to use just "yes" and not also "true". It can cause errors down the line if the user sets "true" and the read returns "yes" - TF will try to change it all the time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the thing is that if anyone passes true, which is reasonably more intuitive than Yes, they get No.
I might need to check it in at validator phase maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a validator for "yes","no"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then delete
|| d.Get(CDCInitialScan).(string) == "true"
for clarity please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
`CREATE CHANGEFEED FOR TABLE %v INTO "external://%s" WITH %s updated, %s diff, on_error='pause', format = avro, avro_schema_prefix='%s_', confluent_schema_registry = 'external://%s'`, | ||
tableListStr, kafkaConnectionName, initialScanClause, cursorClause, avroSchemaPrefix, registryConnectionName, | ||
) | ||
txn, err = startTransaction(db.client, database) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add check err return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
// Setting the table list | ||
currentTableListInterface := d.Get(CDCtableList) | ||
if len(currentTableListInterface.([]interface{})) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section isn't really needed. Read should just read the infra state and return it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not true,
When importing the tableList is empty. Had this in debug, in this case it gets the list from the DB.
The import is only with jobID, all other values are from the DB read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. read needs to read from the database (infra in TF speak). It doesn't need to look at the current state at all - just bring back what's in the database
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok then
return err | ||
} | ||
defer deferredRollback(txn) | ||
txn.Exec(fmt.Sprintf("CANCEL JOB %s", d.Id())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the linter said. Need to assign the err return value before checking it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if err = txn.Commit(); err != nil { | ||
return fmt.Errorf("could not commit transaction: %w", err) | ||
} | ||
waitForJobStatus(db, jobID, "PAUSED") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what the linter said
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tov
query := fmt.Sprintf("SELECT status FROM [SHOW JOB %s]", jobID) | ||
err = txn.QueryRow(query).Scan(&status) | ||
if err != nil { | ||
txn.Rollback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what the linter said
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
} | ||
|
||
if strings.ToUpper(status) == strings.ToUpper(requestedStatus) { | ||
txn.Commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
return nil | ||
} | ||
|
||
txn.Commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
return fmt.Errorf("could not commit transaction: %w", err) | ||
} | ||
d.SetId(connName) | ||
d.Set(ConnUrl, strings.TrimSpace(connUrl)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause drift errors down the line. The .tf file will have one thing but the state another one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
return fmt.Errorf("Error starting transaction: %w", err) | ||
} | ||
var connUrl string | ||
if err := txn.QueryRow("select connection_uri from [show external connections] where connection_name= $1", connName).Scan(&connUrl); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe "Show external connection $1" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
if err := txn.QueryRow("select connection_uri from [show external connections] where connection_name= $1", connName).Scan(&connUrl); err != nil { | ||
return fmt.Errorf("Error reading EXTERNAL CONNECTION: %w", err) | ||
} | ||
// d.Set(ConnUrl, strings.TrimSpace(connUrl)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the URL commented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed.
if a url is changed then you should drop-create it. there isnt an ALTER
No description provided.