You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Regarding the representation of partition values which relates to the use of transform. I'm wondering whether the result of a transform should be a scalar value. There should only be one partition value for each partition which is stored in the datafile. And the partition values are not written to disk.
Regarding the representation of partition values which relates to the use of transform. I'm wondering whether the result of a transform should be a scalar value. There should only be one partition value for each partition which is stored in the datafile. And the partition values are not written to disk.
Yeah, I can understand that it's a little difficult to understand why we should implement on arrow. I will use a simplified partition writer to explain this:
structPartitionWriter{transform_func:Box<dynTransformFunction>,parquet_writers:HashMap<PartitionValue,ParquetWriter>}implPartitionWriter{fnappend(record_batch:RecordBatch){let partition_values = transform_func.apply(record_batch.get(0)).map(|arrow_row| to_iceberg_partition_value(arrow_row))?;// We assume partition function should apply on the first columnfor i in0..record_batch.row_count(){
parquet_writers.get(partition_values.get(i)).append(record_batch.rows(i));}}}
Ah okay, thanks for the explanation. So the reason is to "multiplex" the rows into the correct parquet writer according to its partition value.
Do you think it's possible to use the capabilities of the query engine to repartition the data? Maybe there are some optimizations that can be performed if this is expressed as an Repartition Operator.
Do you think it's possible to use the capabilities of the query engine to repartition the data? May be there are some optimizations that can be performed if this is expressed as an Repartition Operator.
Good point! In icelake, the repartition work is doing by HashMap and arrow provide the effiect way to hash the partition value. And as you says this work maybe can express as Repartition Operator. We can take a investigate.
Do you think it's possible to use the capabilities of the query engine to repartition the data? Maybe there are some optimizations that can be performed if this is expressed as an Repartition Operator.
Yeah, it's possible. However, usually we don't bind compute task to one partition of storage format, since the parallelism of of compute tasks is determined by many factors, not just table partition. I think the hierarchy of iceberg's TaskWriter explains things well enough:
PartitionedWriter is used when data is partitioned and sorted according to partition value.
PartitionedFanoutWriter is used when data not partitioned by shuffle, and need to be precessed in writer.
Activity
liurenjie1024 commentedon Aug 24, 2023
Blocked by #38
liurenjie1024 commentedon Aug 25, 2023
From our experience in developing https://github.com/icelake-io/icelake, we should implement it on arrow array when introducing partition writer.
cc @ZENOTME Coule you help to port our implementation here later?
ZENOTME commentedon Aug 25, 2023
Sure. I'm implementing the transform function for temporal type in icelake now. I will port them gradually later.
JanKaul commentedon Aug 29, 2023
Regarding the representation of partition values which relates to the use of transform. I'm wondering whether the result of a transform should be a scalar value. There should only be one partition value for each partition which is stored in the datafile. And the partition values are not written to disk.
liurenjie1024 commentedon Aug 29, 2023
Yeah, I can understand that it's a little difficult to understand why we should implement on arrow. I will use a simplified partition writer to explain this:
With this approach, we can compute partiton values in a vectorized approach. For more detailed implementation, please refer to https://github.com/icelake-io/icelake/blob/aeba46c62932b482340d34550289a1b3a9f30486/icelake/src/io/task_writer.rs#L175
JanKaul commentedon Aug 29, 2023
Ah okay, thanks for the explanation. So the reason is to "multiplex" the rows into the correct parquet writer according to its partition value.
Do you think it's possible to use the capabilities of the query engine to repartition the data? Maybe there are some optimizations that can be performed if this is expressed as an Repartition Operator.
ZENOTME commentedon Aug 29, 2023
Good point! In icelake, the repartition work is doing by HashMap and arrow provide the effiect way to hash the partition value. And as you says this work maybe can express as Repartition Operator. We can take a investigate.
liurenjie1024 commentedon Aug 30, 2023
Yeah, it's possible. However, usually we don't bind compute task to one partition of storage format, since the parallelism of of compute tasks is determined by many factors, not just table partition. I think the hierarchy of iceberg's
TaskWriter
explains things well enough:PartitionedWriter
is used when data is partitioned and sorted according to partition value.PartitionedFanoutWriter
is used when data not partitioned by shuffle, and need to be precessed in writer.JanKaul commentedon Aug 30, 2023
That makes sense. Thanks again for the explanation.
Generally it makes sense to have a PartitionWriter that doesn't depend on a query engine. Then this could be used in other libraries.
liurenjie1024 commentedon Dec 14, 2023
Close since already finished.
1 remaining item