Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 37 additions & 18 deletions parquet/encryption_read_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
// Read all rows in column
i = 0
for int96reader.HasNext() {
var (
val [1]parquet.Int96
)
var val [1]parquet.Int96

// read one value at a time. the number of rows read is returned. values
// read contains the number of non-null rows
Expand Down Expand Up @@ -553,15 +551,34 @@ func (d *TestDecryptionSuite) checkResults(fileName string, decryptionConfig, en
// once the file is read and the second exists in parquet-testing/data folder
func (d *TestDecryptionSuite) TestDecryption() {
tests := []struct {
file string
config uint
file string
config uint
isInDataStorage bool
}{
{"uniform_encryption.parquet.encrypted", 1},
{"encrypt_columns_and_footer.parquet.encrypted", 2},
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3},
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5},
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6},
{"uniform_encryption.parquet.encrypted", 1, true},
{"uniform_encryption.parquet.uncompressed.encrypted", 1, false},
{"uniform_encryption.parquet.v2.encrypted", 1, false},
{"uniform_encryption.parquet.v2.uncompressed.encrypted", 1, false},
{"encrypt_columns_and_footer.parquet.encrypted", 2, true},
{"encrypt_columns_and_footer.parquet.uncompressed.encrypted", 2, false},
{"encrypt_columns_and_footer.parquet.v2.encrypted", 2, false},
{"encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", 2, false},
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3, true},
{"encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", 3, false},
{"encrypt_columns_plaintext_footer.parquet.v2.encrypted", 3, false},
{"encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", 3, false},
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4, true},
{"encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", 4, false},
{"encrypt_columns_and_footer_aad.parquet.v2.encrypted", 4, false},
{"encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", 4, false},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5, true},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", 5, false},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", 5, false},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", 5, false},
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6, true},
{"encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", 6, false},
{"encrypt_columns_and_footer_ctr.parquet.v2.encrypted", 6, false},
{"encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", 6, false},
}
for _, tt := range tests {
d.Run(tt.file, func() {
Expand All @@ -576,14 +593,16 @@ func (d *TestDecryptionSuite) TestDecryption() {
}
os.Remove(tmpFile)

file := path.Join(getDataDir(), tt.file)
d.Require().FileExists(file)
if tt.isInDataStorage {
file := path.Join(getDataDir(), tt.file)
d.Require().FileExists(file)

for idx := range d.decryptionConfigs {
decConfig := idx + 1
d.Run(fmt.Sprintf("config %d", decConfig), func() {
d.checkResults(file, uint(decConfig), tt.config)
})
for idx := range d.decryptionConfigs {
decConfig := idx + 1
d.Run(fmt.Sprintf("config %d", decConfig), func() {
d.checkResults(file, uint(decConfig), tt.config)
})
}
}
})
}
Expand Down
69 changes: 46 additions & 23 deletions parquet/encryption_write_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ import (
* keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
*/

var (
tempdir string
)
var tempdir string

type EncryptionConfigTestSuite struct {
suite.Suite
Expand All @@ -79,13 +77,16 @@ type EncryptionConfigTestSuite struct {
columnEncryptionKey2 string
}

func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string) {
func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string, writerOpts ...parquet.WriterProperty) {
filename = filepath.Join(tempdir, filename)

props := parquet.NewWriterProperties(
opts := []parquet.WriterProperty{
parquet.WithPageIndexEnabled(true),
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithEncryptionProperties(configs))
parquet.WithEncryptionProperties(configs),
}
opts = append(opts, writerOpts...)
props := parquet.NewWriterProperties(opts...)
outFile, err := os.Create(filename)
en.Require().NoError(err)
en.Require().NotNil(outFile)
Expand Down Expand Up @@ -135,20 +136,18 @@ func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryption

// write the int64 column, each row repeats twice
int64Writer := nextColumn().(*file.Int64ColumnChunkWriter)
for i := 0; i < 2*en.rowsPerRG; i++ {
for i := 0; i < en.rowsPerRG; i++ {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting "batch writing for V2 data pages must start at a row boundary" errors for V2 pages with the original version

var (
defLevel = [1]int16{1}
repLevel = [1]int16{0}
value = int64(i) * 1000 * 1000 * 1000 * 1000
defLevels = []int16{1, 1}
repLevels = []int16{0, 1}
values = []int64{
int64(i*2) * 1000 * 1000 * 1000 * 1000,
int64(i*2+1) * 1000 * 1000 * 1000 * 1000,
}
)
if i%2 == 0 {
repLevel[0] = 0
} else {
repLevel[0] = 1
}

n, err := int64Writer.WriteBatch([]int64{value}, defLevel[:], repLevel[:])
en.EqualValues(1, n)
n, err := int64Writer.WriteBatch(values, defLevels, repLevels)
en.EqualValues(2, n)
en.Require().NoError(err)
}

Expand Down Expand Up @@ -263,7 +262,11 @@ func (en *EncryptionConfigTestSuite) SetupSuite() {
// (uniform encryption)
func (en *EncryptionConfigTestSuite) TestUniformEncryption() {
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"))
en.encryptFile(props, "tmp_uniform_encryption.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption config 2: Encrypt Two Columns and the Footer, with different keys
Expand All @@ -273,7 +276,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooter() {
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols))
en.encryptFile(props, "tmp_encrypt_columns_and_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 3: encrypt two columns, with different keys.
Expand All @@ -285,7 +292,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsPlaintextFooter() {
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithPlaintextFooter())
en.encryptFile(props, "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 4: Encrypt two columns and the footer, with different keys
Expand All @@ -296,7 +307,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAadPrefix(en.fileName))
en.encryptFile(props, "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 5: Encrypt Two columns and the footer, with different keys
Expand All @@ -307,7 +322,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithAadPrefix(en.fileName), parquet.DisableAadPrefixStorage())
en.encryptFile(props, "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 6: Encrypt two columns and the footer, with different keys.
Expand All @@ -318,7 +337,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterAesGcmCtr() {
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr))
en.encryptFile(props, "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

func TestFileEncryption(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions parquet/file/column_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,9 +551,11 @@ func (p *PrimitiveReaderSuite) TestRepetitionLvlBytesWithMaxRepZero() {
// bytes: the page header reports 1 byte for repetition levels even
// though the max rep level is 0. If that byte isn't skipped then
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0].
pageData := [...]byte{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
pageData := [...]byte{
0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}

p.pages = append(p.pages, file.NewDataPageV2(memory.NewBufferBytes(pageData[:]), batchSize, 1, batchSize,
parquet.Encodings.DeltaBinaryPacked, 2, 1, int32(len(pageData)), false))
Expand Down Expand Up @@ -733,7 +735,6 @@ func TestFullSeekRow(t *testing.T) {

for _, dataPageVersion := range []parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} {
t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1), func(t *testing.T) {

props := parquet.NewWriterProperties(parquet.WithAllocator(mem),
parquet.WithDataPageVersion(dataPageVersion), parquet.WithDataPageSize(1),
parquet.WithPageIndexEnabled(true))
Expand Down
5 changes: 3 additions & 2 deletions parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (p *PageSerdeSuite) TestDataPageV2() {

p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
p.dataPageHdrV2.NumValues = nrows
p.WriteDataPageHeaderV2(1024, 20, 10)
p.WriteDataPageHeaderV2(1024, 0, 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original version of serializedPageReader.Next() ignored error checking of io.ReadFull. Was that deliberate? I've added the error checking and now the p.pageReader.Next() returns false if the compressed and uncompressed values are not 0, so I modified those.

p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
p.True(p.pageReader.Next())
p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
Expand Down Expand Up @@ -310,7 +310,8 @@ func (p *PageSerdeSuite) TestCompression() {

func TestWithEOFReader(t *testing.T) {
root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, schema.FieldList{
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1)}, -1)
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
}, -1)
props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST))

var buf bytes.Buffer
Expand Down
58 changes: 51 additions & 7 deletions parquet/file/page_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,50 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [
return p.codec.Decode(buf, data), nil
}

func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
// if encrypted, we need to decrypt before decompressing
p.decompressBuffer.ResizeNoShrink(lenCompressed)
b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
return err
}
data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
// encrypted + uncompressed -> just copy the decrypted data to output buffer
if !compressed {
copy(buf, data)
return nil
}

// definition + repetition levels are always uncompressed
if levelsBytelen > 0 {
copy(buf, data[:levelsBytelen])
data = data[levelsBytelen:]
}
p.codec.Decode(buf[levelsBytelen:], data)
return nil
}

func (p *serializedPageReader) readV2Unencrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
if !compressed {
// uncompressed, just read into the buffer
if _, err := io.ReadFull(rd, buf); err != nil {
return err
}
return nil
}

// definition + repetition levels are always uncompressed
if levelsBytelen > 0 {
if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil {
return err
}
}
if _, err := p.decompress(p.r, lenCompressed-levelsBytelen, buf[levelsBytelen:]); err != nil {
return err
}
return nil
}

type dataheader interface {
IsSetStatistics() bool
GetStatistics() *format.Statistics
Expand Down Expand Up @@ -628,7 +672,6 @@ func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *fo
}
continue
}

rd.Discard(len(view) - int(remaining) + extra)
break
}
Expand Down Expand Up @@ -812,15 +855,16 @@ func (p *serializedPageReader) Next() bool {
return false
}

if compressed {
if levelsBytelen > 0 {
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
}
if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
if p.cryptoCtx.DataDecryptor != nil {
if err := p.readV2Encrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
p.err = err
return false
}
} else {
io.ReadFull(p.r, buf.Bytes())
if err := p.readV2Unencrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
p.err = err
return false
}
}
Comment on lines +858 to 868
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the issue is simply that we're not passing uncompressed v2 through the decryptor right?

Wouldn't a simpler solution here just be to add the decryption check to the non-compressed branch?

i.e. since the call to decompress will already decrypt the data, we just need to replace the io.ReadFull(p.r, buf.Bytes()) with code to check if we need to decrypt and then decrypt it here, otherwise just leave the io.ReadFull right?


if buf.Len() != lenUncompressed {
Expand Down