diff --git a/parquet/encryption_read_config_test.go b/parquet/encryption_read_config_test.go index d0ddb3d2..5bf15f19 100644 --- a/parquet/encryption_read_config_test.go +++ b/parquet/encryption_read_config_test.go @@ -295,9 +295,7 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int) // Read all rows in column i = 0 for int96reader.HasNext() { - var ( - val [1]parquet.Int96 - ) + var val [1]parquet.Int96 // read one value at a time. the number of rows read is returned. values // read contains the number of non-null rows @@ -553,15 +551,34 @@ func (d *TestDecryptionSuite) checkResults(fileName string, decryptionConfig, en // once the file is read and the second exists in parquet-testing/data folder func (d *TestDecryptionSuite) TestDecryption() { tests := []struct { - file string - config uint + file string + config uint + isInDataStorage bool }{ - {"uniform_encryption.parquet.encrypted", 1}, - {"encrypt_columns_and_footer.parquet.encrypted", 2}, - {"encrypt_columns_plaintext_footer.parquet.encrypted", 3}, - {"encrypt_columns_and_footer_aad.parquet.encrypted", 4}, - {"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5}, - {"encrypt_columns_and_footer_ctr.parquet.encrypted", 6}, + {"uniform_encryption.parquet.encrypted", 1, true}, + {"uniform_encryption.parquet.uncompressed.encrypted", 1, false}, + {"uniform_encryption.parquet.v2.encrypted", 1, false}, + {"uniform_encryption.parquet.v2.uncompressed.encrypted", 1, false}, + {"encrypt_columns_and_footer.parquet.encrypted", 2, true}, + {"encrypt_columns_and_footer.parquet.uncompressed.encrypted", 2, false}, + {"encrypt_columns_and_footer.parquet.v2.encrypted", 2, false}, + {"encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", 2, false}, + {"encrypt_columns_plaintext_footer.parquet.encrypted", 3, true}, + {"encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", 3, false}, + {"encrypt_columns_plaintext_footer.parquet.v2.encrypted", 3, false}, + {"encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", 3, false}, + {"encrypt_columns_and_footer_aad.parquet.encrypted", 4, true}, + {"encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", 4, false}, + {"encrypt_columns_and_footer_aad.parquet.v2.encrypted", 4, false}, + {"encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", 4, false}, + {"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5, true}, + {"encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", 5, false}, + {"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", 5, false}, + {"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", 5, false}, + {"encrypt_columns_and_footer_ctr.parquet.encrypted", 6, true}, + {"encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", 6, false}, + {"encrypt_columns_and_footer_ctr.parquet.v2.encrypted", 6, false}, + {"encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", 6, false}, } for _, tt := range tests { d.Run(tt.file, func() { @@ -576,14 +593,16 @@ func (d *TestDecryptionSuite) TestDecryption() { } os.Remove(tmpFile) - file := path.Join(getDataDir(), tt.file) - d.Require().FileExists(file) + if tt.isInDataStorage { + file := path.Join(getDataDir(), tt.file) + d.Require().FileExists(file) - for idx := range d.decryptionConfigs { - decConfig := idx + 1 - d.Run(fmt.Sprintf("config %d", decConfig), func() { - d.checkResults(file, uint(decConfig), tt.config) - }) + for idx := range d.decryptionConfigs { + decConfig := idx + 1 + d.Run(fmt.Sprintf("config %d", decConfig), func() { + d.checkResults(file, uint(decConfig), tt.config) + }) + } } }) } diff --git a/parquet/encryption_write_config_test.go b/parquet/encryption_write_config_test.go index b2c10e5b..8bb7b547 100644 --- a/parquet/encryption_write_config_test.go +++ b/parquet/encryption_write_config_test.go @@ -61,9 +61,7 @@ import ( * keys. Use the alternative (AES_GCM_CTR_V1) algorithm. */ -var ( - tempdir string -) +var tempdir string type EncryptionConfigTestSuite struct { suite.Suite @@ -79,13 +77,16 @@ type EncryptionConfigTestSuite struct { columnEncryptionKey2 string } -func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string) { +func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string, writerOpts ...parquet.WriterProperty) { filename = filepath.Join(tempdir, filename) - props := parquet.NewWriterProperties( + opts := []parquet.WriterProperty{ parquet.WithPageIndexEnabled(true), parquet.WithCompression(compress.Codecs.Snappy), - parquet.WithEncryptionProperties(configs)) + parquet.WithEncryptionProperties(configs), + } + opts = append(opts, writerOpts...) + props := parquet.NewWriterProperties(opts...) outFile, err := os.Create(filename) en.Require().NoError(err) en.Require().NotNil(outFile) @@ -135,20 +136,18 @@ func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryption // write the int64 column, each row repeats twice int64Writer := nextColumn().(*file.Int64ColumnChunkWriter) - for i := 0; i < 2*en.rowsPerRG; i++ { + for i := 0; i < en.rowsPerRG; i++ { var ( - defLevel = [1]int16{1} - repLevel = [1]int16{0} - value = int64(i) * 1000 * 1000 * 1000 * 1000 + defLevels = []int16{1, 1} + repLevels = []int16{0, 1} + values = []int64{ + int64(i*2) * 1000 * 1000 * 1000 * 1000, + int64(i*2+1) * 1000 * 1000 * 1000 * 1000, + } ) - if i%2 == 0 { - repLevel[0] = 0 - } else { - repLevel[0] = 1 - } - n, err := int64Writer.WriteBatch([]int64{value}, defLevel[:], repLevel[:]) - en.EqualValues(1, n) + n, err := int64Writer.WriteBatch(values, defLevels, repLevels) + en.EqualValues(2, n) en.Require().NoError(err) } @@ -263,7 +262,11 @@ func (en *EncryptionConfigTestSuite) SetupSuite() { // (uniform encryption) func (en *EncryptionConfigTestSuite) TestUniformEncryption() { props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf")) - en.encryptFile(props, "tmp_uniform_encryption.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed)) + en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2)) + en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed), + parquet.WithDataPageVersion(parquet.DataPageV2)) } // Encryption config 2: Encrypt Two Columns and the Footer, with different keys @@ -273,7 +276,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooter() { encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2")) props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols)) - en.encryptFile(props, "tmp_encrypt_columns_and_footer.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed), + parquet.WithDataPageVersion(parquet.DataPageV2)) } // Encryption Config 3: encrypt two columns, with different keys. @@ -285,7 +292,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsPlaintextFooter() { encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2")) props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithPlaintextFooter()) - en.encryptFile(props, "tmp_encrypt_columns_plaintext_footer.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed), + parquet.WithDataPageVersion(parquet.DataPageV2)) } // Encryption Config 4: Encrypt two columns and the footer, with different keys @@ -296,7 +307,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2")) props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAadPrefix(en.fileName)) - en.encryptFile(props, "tmp_encrypt_columns_and_footer_aad.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed), + parquet.WithDataPageVersion(parquet.DataPageV2)) } // Encryption Config 5: Encrypt Two columns and the footer, with different keys @@ -307,7 +322,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2")) props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithAadPrefix(en.fileName), parquet.DisableAadPrefixStorage()) - en.encryptFile(props, "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed), + parquet.WithDataPageVersion(parquet.DataPageV2)) } // Encryption Config 6: Encrypt two columns and the footer, with different keys. @@ -318,7 +337,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterAesGcmCtr() { encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2")) props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr)) - en.encryptFile(props, "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted") + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2)) + en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed), + parquet.WithDataPageVersion(parquet.DataPageV2)) } func TestFileEncryption(t *testing.T) { diff --git a/parquet/file/column_reader_test.go b/parquet/file/column_reader_test.go index 25c26bc8..581f8957 100644 --- a/parquet/file/column_reader_test.go +++ b/parquet/file/column_reader_test.go @@ -551,9 +551,11 @@ func (p *PrimitiveReaderSuite) TestRepetitionLvlBytesWithMaxRepZero() { // bytes: the page header reports 1 byte for repetition levels even // though the max rep level is 0. If that byte isn't skipped then // we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0]. - pageData := [...]byte{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3, + pageData := [...]byte{ + 0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3, 0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + } p.pages = append(p.pages, file.NewDataPageV2(memory.NewBufferBytes(pageData[:]), batchSize, 1, batchSize, parquet.Encodings.DeltaBinaryPacked, 2, 1, int32(len(pageData)), false)) @@ -733,7 +735,6 @@ func TestFullSeekRow(t *testing.T) { for _, dataPageVersion := range []parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} { t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1), func(t *testing.T) { - props := parquet.NewWriterProperties(parquet.WithAllocator(mem), parquet.WithDataPageVersion(dataPageVersion), parquet.WithDataPageSize(1), parquet.WithPageIndexEnabled(true)) diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go index 1927ca87..c764ff07 100644 --- a/parquet/file/file_reader_test.go +++ b/parquet/file/file_reader_test.go @@ -207,7 +207,7 @@ func (p *PageSerdeSuite) TestDataPageV2() { p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true) p.dataPageHdrV2.NumValues = nrows - p.WriteDataPageHeaderV2(1024, 20, 10) + p.WriteDataPageHeaderV2(1024, 0, 0) p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed) p.True(p.pageReader.Next()) p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page()) @@ -310,7 +310,8 @@ func (p *PageSerdeSuite) TestCompression() { func TestWithEOFReader(t *testing.T) { root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, schema.FieldList{ - schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1)}, -1) + schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1), + }, -1) props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST)) var buf bytes.Buffer diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go index 1ba7ecbe..307c6c6a 100644 --- a/parquet/file/page_reader.go +++ b/parquet/file/page_reader.go @@ -515,6 +515,50 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [ return p.codec.Decode(buf, data), nil } +func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error { + // if encrypted, we need to decrypt before decompressing + p.decompressBuffer.ResizeNoShrink(lenCompressed) + b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0]) + if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil { + return err + } + data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes()) + // encrypted + uncompressed -> just copy the decrypted data to output buffer + if !compressed { + copy(buf, data) + return nil + } + + // definition + repetition levels are always uncompressed + if levelsBytelen > 0 { + copy(buf, data[:levelsBytelen]) + data = data[levelsBytelen:] + } + p.codec.Decode(buf[levelsBytelen:], data) + return nil +} + +func (p *serializedPageReader) readV2Unencrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error { + if !compressed { + // uncompressed, just read into the buffer + if _, err := io.ReadFull(rd, buf); err != nil { + return err + } + return nil + } + + // definition + repetition levels are always uncompressed + if levelsBytelen > 0 { + if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil { + return err + } + } + if _, err := p.decompress(p.r, lenCompressed-levelsBytelen, buf[levelsBytelen:]); err != nil { + return err + } + return nil +} + type dataheader interface { IsSetStatistics() bool GetStatistics() *format.Statistics @@ -628,7 +672,6 @@ func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *fo } continue } - rd.Discard(len(view) - int(remaining) + extra) break } @@ -812,15 +855,16 @@ func (p *serializedPageReader) Next() bool { return false } - if compressed { - if levelsBytelen > 0 { - io.ReadFull(p.r, buf.Bytes()[:levelsBytelen]) - } - if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil { + if p.cryptoCtx.DataDecryptor != nil { + if err := p.readV2Encrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil { + p.err = err return false } } else { - io.ReadFull(p.r, buf.Bytes()) + if err := p.readV2Unencrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil { + p.err = err + return false + } } if buf.Len() != lenUncompressed {