parquet-coder

Patterns for efficient columnar data storage with Parquet.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "parquet-coder" with this command: npx skills add majesticlabs-dev/majestic-marketplace/majesticlabs-dev-majestic-marketplace-parquet-coder

Parquet-Coder

Patterns for efficient columnar data storage with Parquet.

Basic Operations

import pandas as pd import pyarrow as pa import pyarrow.parquet as pq

Write with compression

df.to_parquet('data.parquet', compression='snappy', index=False)

Common compression options:

- snappy: Fast, good compression (default)

- gzip: Slower, better compression

- zstd: Best balance of speed/compression

- None: No compression (fastest writes)

Read entire file

df = pd.read_parquet('data.parquet')

Read specific columns only (predicate pushdown)

df = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])

PyArrow for Large Files

Read as PyArrow Table (more memory efficient)

table = pq.read_table('data.parquet')

Convert to pandas when needed

df = table.to_pandas()

Filter while reading (row group filtering)

table = pq.read_table( 'data.parquet', filters=[ ('date', '>=', '2024-01-01'), ('status', '=', 'active') ] )

Read in batches for huge files

parquet_file = pq.ParquetFile('huge.parquet') for batch in parquet_file.iter_batches(batch_size=100_000): df_batch = batch.to_pandas() process(df_batch)

Partitioned Datasets

Write partitioned by columns

df.to_parquet( 'data/', partition_cols=['year', 'month'], compression='snappy' )

Creates: data/year=2024/month=01/part-0.parquet

Read partitioned dataset

df = pd.read_parquet('data/') # Reads all partitions

Read specific partitions only

df = pd.read_parquet('data/year=2024/')

With PyArrow dataset API (more control)

import pyarrow.dataset as ds

dataset = ds.dataset('data/', format='parquet', partitioning='hive')

Filter on partition columns (very fast)

table = dataset.to_table( filter=(ds.field('year') == 2024) & (ds.field('month') >= 6) )

Schema Definition

Explicit schema for consistency

schema = pa.schema([ ('id', pa.int64()), ('name', pa.string()), ('value', pa.float64()), ('date', pa.date32()), ('tags', pa.list_(pa.string())), ('metadata', pa.map_(pa.string(), pa.string())), ])

Write with schema

table = pa.Table.from_pandas(df, schema=schema) pq.write_table(table, 'data.parquet')

Read and validate schema

file_schema = pq.read_schema('data.parquet') assert file_schema.equals(schema), "Schema mismatch!"

Schema Evolution

def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema: """Create unified schema from old and new.""" fields = {f.name: f for f in old_schema} for field in new_schema: if field.name not in fields: fields[field.name] = field elif fields[field.name].type != field.type: # Handle type conflicts (e.g., promote int to float) fields[field.name] = pa.field( field.name, promote_type(fields[field.name].type, field.type) ) return pa.schema(list(fields.values()))

def append_with_schema_evolution( existing_path: str, new_df: pd.DataFrame, output_path: str ) -> None: """Append data with automatic schema evolution.""" existing = pq.read_table(existing_path) new_table = pa.Table.from_pandas(new_df)

# Unify schemas
unified_schema = pa.unify_schemas([existing.schema, new_table.schema])

# Cast both to unified schema
existing = existing.cast(unified_schema)
new_table = new_table.cast(unified_schema)

# Concatenate and write
combined = pa.concat_tables([existing, new_table])
pq.write_table(combined, output_path)

Row Group Optimization

Control row group size (affects read performance)

pq.write_table( table, 'data.parquet', row_group_size=100_000, # Rows per group compression='snappy' )

Read metadata to see row groups

parquet_file = pq.ParquetFile('data.parquet') print(f"Num row groups: {parquet_file.metadata.num_row_groups}") print(f"Num rows: {parquet_file.metadata.num_rows}")

Read specific row groups

table = parquet_file.read_row_groups([0, 1]) # First two groups

Metadata and Statistics

Add custom metadata

custom_metadata = { b'created_by': b'etl_pipeline', b'version': b'1.0', b'source': b'api_export' } schema = table.schema.with_metadata(custom_metadata) table = table.cast(schema) pq.write_table(table, 'data.parquet')

Read metadata

parquet_file = pq.ParquetFile('data.parquet') print(parquet_file.schema.metadata)

Column statistics (min/max for filtering)

metadata = parquet_file.metadata for i in range(metadata.num_row_groups): rg = metadata.row_group(i) for j in range(rg.num_columns): col = rg.column(j) if col.statistics: print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")

Delta Lake Integration

from deltalake import DeltaTable, write_deltalake

Write as Delta table (versioned parquet)

write_deltalake('delta_table/', df, mode='overwrite')

Append data

write_deltalake('delta_table/', new_df, mode='append')

Read Delta table

dt = DeltaTable('delta_table/') df = dt.to_pandas()

Time travel

df_old = dt.load_version(0).to_pandas()

Compact small files

dt.optimize.compact()

Vacuum old versions

dt.vacuum(retention_hours=168) # Keep 7 days

Performance Tips

1. Use appropriate column types

schema = pa.schema([ ('category', pa.dictionary(pa.int8(), pa.string())), # For repeated strings ('count', pa.int32()), # Not int64 if values fit ])

2. Sort data before writing (improves predicate pushdown)

df = df.sort_values(['date', 'category']) df.to_parquet('sorted.parquet')

3. Use column selection

df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])

4. Use filters for row pruning

df = pd.read_parquet('data/', filters=[('status', '==', 'active')])

5. Parallel reads

import pyarrow.dataset as ds dataset = ds.dataset('data/', format='parquet') table = dataset.to_table(use_threads=True)

Conversion Utilities

def csv_to_parquet( csv_path: str, parquet_path: str, chunk_size: int = 100_000 ) -> None: """Convert large CSV to Parquet efficiently.""" writer = None

for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
    table = pa.Table.from_pandas(chunk)

    if writer is None:
        writer = pq.ParquetWriter(parquet_path, table.schema)

    writer.write_table(table)

writer.close()

def json_to_parquet(json_path: str, parquet_path: str) -> None: """Convert JSON lines to Parquet.""" df = pd.read_json(json_path, lines=True) df.to_parquet(parquet_path, compression='snappy')

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

google-ads-strategy

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

viral-content

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

market-research

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

free-tool-arsenal

No summary provided by upstream source.

Repository SourceNeeds Review