Java를 이용해 Parquet 파일을 쓰고 읽는 코드를 소개한다.
라이브러리 버전 정보
- org.apache.parquet:parquet-avro:1.12.2
- org.apache.hadoop:hadoop-common:3.3.4
- org.apache.hadoop:hadoop-mapreduce-client-core:3.3.4
유의점
parquet 파일 저장 코드를 소개하는 포스트들은 대게 Path 경로만 넣어서 writer를 빌드하는 방법을 사용한다. 허나 아쉽게도 해당 코드는 Deprecated 처리되었는데 이유는 정확히 모르겠다.. 어쨌거나 Path가 아닌 org.apache.parquet.io.OutputFile (interface) 를 구현한 클래스를 넣어야 한다.

- https://issues.apache.org/jira/browse/ORC-937?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17397743#comment-17397743
파일이 저장되는 경로는 로컬의 파일시스템으로 설정하였다.
- Write Parquet
void writeParquetInFileSystem() throws IOException {
Schema schema = SchemaBuilder
.record("record")
.namespace("namespace")
.fields()
.name("col1").type().nullable().stringType().noDefault()
.name("col2").type().nullable().stringType().noDefault()
.name("col3").type().nullable().stringType().noDefault()
.endRecord();
Path path = new org.apache.hadoop.fs.Path("/tmp/file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
ParquetWriter<GenericData.Record> writer = AvroParquetWriter
.<GenericData.Record>builder(outputFile)
.withSchema(schema)
.build();
GenericData.Record record = new GenericData.Record(schema);
record.put("col1", "Col1 Data1"); record.put("col2", "Col2 Data1"); record.put("col3", "Col3 Data1");
writer.write(record);
record.put("col1", "Col1 Data2"); record.put("col2", "Col2 Data2"); record.put("col3", "Col3 Data2");
writer.write(record);
writer.close();
}
- Read Parquet
void readParquetFromFileSystem() throws IOException {
Path path = new org.apache.hadoop.fs.Path("/tmp/file.parquet");
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
ParquetFileReader reader = ParquetFileReader.open(inputFile);
System.out.println("> Metadata Print");
ParquetMetadata metadata = reader.getFooter();
System.out.println(metadata);
System.out.println("> Data Print");
MessageType messageType = reader
.getFooter()
.getFileMetaData()
.getSchema();
PageReadStore pageReadStore = null;
while ( (pageReadStore = reader.readNextFilteredRowGroup()) != null ) {
final long rows = pageReadStore.getRowCount();
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(messageType);
final RecordReader recordReader = columnIO.getRecordReader(pageReadStore, new GroupRecordConverter(messageType));
for (int i = 0; i < rows; i++) {
System.out.println(String.format(">> %d 번째 Record", i+1));
System.out.print(recordReader.read());
}
}
reader.close();
}
읽는 방식은 꼭 정해진게 아니라 원하는대로 정의할 수 있다. org.apache.parquet.io.api.RecordMaterializer를 상속받는 클래스를 직접 만들어서 커스텀하면된다. 지금은 parquet 라이브러리에서 예제로 제공하는 GroupRecordConverter를 썼는데, 이걸 썼을 때의 출력 결과는 아래와 같다.

코드 Github
Apache Parquet 프로젝트 공식 웹사이트
'데이터 엔지니어링(Deep Dive)' 카테고리의 다른 글
| (pyspark+jupyterlab) Ubuntu내 분석 환경 설정 (0) | 2022.12.03 |
|---|---|
| [Python] Child process 생성 (0) | 2022.12.03 |
| Cursor 와 Loop Query 그리고 Procedure ( in MYSQL ) (1) | 2022.09.23 |
| 정규표현식 활용 Apply Regular Expression #JAVA (0) | 2022.09.02 |
| SIMPLE DEEP LEARNING WITH PYTHON (0) | 2021.08.20 |