데이터 엔지니어링(Deep Dive)

[Java DataEngineering] Parquet 파일 쓰고 읽는 코드 정리 Write & Read

직장인B 2022. 11. 25. 10:56

Java를 이용해 Parquet 파일을 쓰고 읽는 코드를 소개한다. 

 

라이브러리 버전 정보

  • org.apache.parquet:parquet-avro:1.12.2
  • org.apache.hadoop:hadoop-common:3.3.4
  • org.apache.hadoop:hadoop-mapreduce-client-core:3.3.4

유의점

 parquet 파일 저장 코드를 소개하는 포스트들은 대게 Path 경로만 넣어서 writer를 빌드하는 방법을 사용한다. 허나 아쉽게도 해당 코드는  Deprecated 처리되었는데 이유는 정확히 모르겠다.. 어쨌거나 Path가 아닌 org.apache.parquet.io.OutputFile (interface) 를 구현한 클래스를 넣어야 한다. 

  • https://issues.apache.org/jira/browse/ORC-937?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17397743#comment-17397743

 파일이 저장되는 경로는 로컬의 파일시스템으로 설정하였다. 

  • Write Parquet
void writeParquetInFileSystem() throws IOException {
    Schema schema = SchemaBuilder
            .record("record")
            .namespace("namespace")
            .fields()
            .name("col1").type().nullable().stringType().noDefault()
            .name("col2").type().nullable().stringType().noDefault()
            .name("col3").type().nullable().stringType().noDefault()
            .endRecord();

    Path path = new org.apache.hadoop.fs.Path("/tmp/file.parquet");
    OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
    ParquetWriter<GenericData.Record> writer = AvroParquetWriter
            .<GenericData.Record>builder(outputFile)
            .withSchema(schema)
            .build();

    GenericData.Record record = new GenericData.Record(schema);
    record.put("col1", "Col1 Data1"); record.put("col2", "Col2 Data1"); record.put("col3", "Col3 Data1");
    writer.write(record);
    record.put("col1", "Col1 Data2"); record.put("col2", "Col2 Data2"); record.put("col3", "Col3 Data2");
    writer.write(record);

    writer.close();
}
  • Read Parquet
void readParquetFromFileSystem() throws IOException {
    Path path = new org.apache.hadoop.fs.Path("/tmp/file.parquet");
    InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
    ParquetFileReader reader = ParquetFileReader.open(inputFile);

    System.out.println("> Metadata Print");
    ParquetMetadata metadata = reader.getFooter();
    System.out.println(metadata);

    System.out.println("> Data Print");
    MessageType messageType = reader
            .getFooter()
            .getFileMetaData()
            .getSchema();
    PageReadStore pageReadStore = null;
    while ( (pageReadStore = reader.readNextFilteredRowGroup()) != null ) {
        final long rows = pageReadStore.getRowCount();
        final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(messageType);
        final RecordReader recordReader = columnIO.getRecordReader(pageReadStore, new GroupRecordConverter(messageType));
        for (int i = 0; i < rows; i++) {
            System.out.println(String.format(">> %d 번째 Record", i+1));
            System.out.print(recordReader.read());
        }
    }

    reader.close();
}

 읽는 방식은 꼭 정해진게 아니라 원하는대로 정의할 수 있다. org.apache.parquet.io.api.RecordMaterializer를 상속받는 클래스를 직접 만들어서 커스텀하면된다. 지금은 parquet 라이브러리에서 예제로 제공하는 GroupRecordConverter를 썼는데, 이걸 썼을 때의 출력 결과는 아래와 같다.  


코드 Github

Apache Parquet 프로젝트 공식 웹사이트