Skip to content

Add transform function implementation. #32

@liurenjie1024

Description

@liurenjie1024
No description provided.

Activity

liurenjie1024

liurenjie1024 commented on Aug 24, 2023

@liurenjie1024
ContributorAuthor

Blocked by #38

liurenjie1024

liurenjie1024 commented on Aug 25, 2023

@liurenjie1024
ContributorAuthor

From our experience in developing https://github.com/icelake-io/icelake, we should implement it on arrow array when introducing partition writer.

cc @ZENOTME Coule you help to port our implementation here later?

ZENOTME

ZENOTME commented on Aug 25, 2023

@ZENOTME
Contributor

cc @ZENOTME Coule you help to port our implementation here later?

Sure. I'm implementing the transform function for temporal type in icelake now. I will port them gradually later.

JanKaul

JanKaul commented on Aug 29, 2023

@JanKaul
Collaborator

Regarding the representation of partition values which relates to the use of transform. I'm wondering whether the result of a transform should be a scalar value. There should only be one partition value for each partition which is stored in the datafile. And the partition values are not written to disk.

liurenjie1024

liurenjie1024 commented on Aug 29, 2023

@liurenjie1024
ContributorAuthor

Regarding the representation of partition values which relates to the use of transform. I'm wondering whether the result of a transform should be a scalar value. There should only be one partition value for each partition which is stored in the datafile. And the partition values are not written to disk.

Yeah, I can understand that it's a little difficult to understand why we should implement on arrow. I will use a simplified partition writer to explain this:

struct PartitionWriter {
  transform_func: Box<dyn TransformFunction>,
  parquet_writers: HashMap<PartitionValue, ParquetWriter>
}

impl PartitionWriter {
  fn append(record_batch: RecordBatch)  {
     let partition_values = transform_func.apply(record_batch.get(0)).map(|arrow_row| to_iceberg_partition_value(arrow_row))?; // We assume partition function should apply on the first column
    
     for i in 0..record_batch.row_count() {
      parquet_writers.get(partition_values.get(i)).append(record_batch.rows(i));
}
     
  }
}

With this approach, we can compute partiton values in a vectorized approach. For more detailed implementation, please refer to https://github.com/icelake-io/icelake/blob/aeba46c62932b482340d34550289a1b3a9f30486/icelake/src/io/task_writer.rs#L175

JanKaul

JanKaul commented on Aug 29, 2023

@JanKaul
Collaborator

Ah okay, thanks for the explanation. So the reason is to "multiplex" the rows into the correct parquet writer according to its partition value.

Do you think it's possible to use the capabilities of the query engine to repartition the data? Maybe there are some optimizations that can be performed if this is expressed as an Repartition Operator.

ZENOTME

ZENOTME commented on Aug 29, 2023

@ZENOTME
Contributor

Do you think it's possible to use the capabilities of the query engine to repartition the data? May be there are some optimizations that can be performed if this is expressed as an Repartition Operator.

Good point! In icelake, the repartition work is doing by HashMap and arrow provide the effiect way to hash the partition value. And as you says this work maybe can express as Repartition Operator. We can take a investigate.

liurenjie1024

liurenjie1024 commented on Aug 30, 2023

@liurenjie1024
ContributorAuthor

Do you think it's possible to use the capabilities of the query engine to repartition the data? Maybe there are some optimizations that can be performed if this is expressed as an Repartition Operator.

Yeah, it's possible. However, usually we don't bind compute task to one partition of storage format, since the parallelism of of compute tasks is determined by many factors, not just table partition. I think the hierarchy of iceberg's TaskWriter explains things well enough:

image

PartitionedWriter is used when data is partitioned and sorted according to partition value.

PartitionedFanoutWriter is used when data not partitioned by shuffle, and need to be precessed in writer.

JanKaul

JanKaul commented on Aug 30, 2023

@JanKaul
Collaborator

That makes sense. Thanks again for the explanation.

Generally it makes sense to have a PartitionWriter that doesn't depend on a query engine. Then this could be used in other libraries.

liurenjie1024

liurenjie1024 commented on Dec 14, 2023

@liurenjie1024
ContributorAuthor

Close since already finished.

1 remaining item

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    Add transform function implementation. · Issue #32 · apache/iceberg-rust