-
Notifications
You must be signed in to change notification settings - Fork 87
Fix decryption of V2 data pages #596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -207,7 +207,7 @@ func (p *PageSerdeSuite) TestDataPageV2() { | |
|
|
||
| p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true) | ||
| p.dataPageHdrV2.NumValues = nrows | ||
| p.WriteDataPageHeaderV2(1024, 20, 10) | ||
| p.WriteDataPageHeaderV2(1024, 0, 0) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The original version of serializedPageReader.Next() ignored error checking of |
||
| p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed) | ||
| p.True(p.pageReader.Next()) | ||
| p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page()) | ||
|
|
@@ -310,7 +310,8 @@ func (p *PageSerdeSuite) TestCompression() { | |
|
|
||
| func TestWithEOFReader(t *testing.T) { | ||
| root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, schema.FieldList{ | ||
| schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1)}, -1) | ||
| schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1), | ||
| }, -1) | ||
| props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST)) | ||
|
|
||
| var buf bytes.Buffer | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -515,6 +515,50 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [ | |
| return p.codec.Decode(buf, data), nil | ||
| } | ||
|
|
||
| func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error { | ||
| // if encrypted, we need to decrypt before decompressing | ||
| p.decompressBuffer.ResizeNoShrink(lenCompressed) | ||
| b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0]) | ||
| if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil { | ||
| return err | ||
| } | ||
| data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes()) | ||
| // encrypted + uncompressed -> just copy the decrypted data to output buffer | ||
| if !compressed { | ||
| copy(buf, data) | ||
| return nil | ||
| } | ||
|
|
||
| // definition + repetition levels are always uncompressed | ||
| if levelsBytelen > 0 { | ||
| copy(buf, data[:levelsBytelen]) | ||
| data = data[levelsBytelen:] | ||
| } | ||
| p.codec.Decode(buf[levelsBytelen:], data) | ||
| return nil | ||
| } | ||
|
|
||
| func (p *serializedPageReader) readV2Unencrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error { | ||
| if !compressed { | ||
| // uncompressed, just read into the buffer | ||
| if _, err := io.ReadFull(rd, buf); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // definition + repetition levels are always uncompressed | ||
| if levelsBytelen > 0 { | ||
| if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| if _, err := p.decompress(p.r, lenCompressed-levelsBytelen, buf[levelsBytelen:]); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| type dataheader interface { | ||
| IsSetStatistics() bool | ||
| GetStatistics() *format.Statistics | ||
|
|
@@ -628,7 +672,6 @@ func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *fo | |
| } | ||
| continue | ||
| } | ||
|
|
||
| rd.Discard(len(view) - int(remaining) + extra) | ||
| break | ||
| } | ||
|
|
@@ -812,15 +855,16 @@ func (p *serializedPageReader) Next() bool { | |
| return false | ||
| } | ||
|
|
||
| if compressed { | ||
| if levelsBytelen > 0 { | ||
| io.ReadFull(p.r, buf.Bytes()[:levelsBytelen]) | ||
| } | ||
| if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil { | ||
| if p.cryptoCtx.DataDecryptor != nil { | ||
| if err := p.readV2Encrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil { | ||
| p.err = err | ||
| return false | ||
| } | ||
| } else { | ||
| io.ReadFull(p.r, buf.Bytes()) | ||
| if err := p.readV2Unencrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil { | ||
| p.err = err | ||
| return false | ||
| } | ||
| } | ||
|
Comment on lines
+858
to
868
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the issue is simply that we're not passing uncompressed v2 through the decryptor right? Wouldn't a simpler solution here just be to add the decryption check to the non-compressed branch? i.e. since the call to |
||
|
|
||
| if buf.Len() != lenUncompressed { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was getting "batch writing for V2 data pages must start at a row boundary" errors for V2 pages with the original version