es-ingest

Stream-based ingestion and transformation of large data files into Elasticsearch. Built on node-es-transformer.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "es-ingest" with this command: npx skills add walterra/agent-tools/walterra-agent-tools-es-ingest

ES Ingest

Stream-based ingestion and transformation of large data files into Elasticsearch. Built on node-es-transformer.

Features

  • ✅ Stream-based: Handle very large files (20-30 GB tested) without running out of memory

  • ✅ High throughput: Up to 20k documents/second on commodity hardware

  • ✅ Cross-version: Seamlessly migrate between ES 8.x and 9.x

  • ✅ Formats: NDJSON, CSV, Parquet, Arrow IPC

  • ✅ Transformations: Apply custom JavaScript transforms during ingestion

  • ✅ Reindexing: Copy and transform existing indices

  • ✅ Wildcards: Ingest multiple files matching a pattern (e.g., logs/*.json )

  • ✅ Document splitting: Transform one source document into multiple targets

Prerequisites

  • Elasticsearch 8.x or 9.x accessible (local or remote)

  • Node.js 22+ installed

Setup

Before first use, install dependencies:

cd {baseDir} && npm install

Basic Usage

Ingest a JSON file

{baseDir}/scripts/ingest.js --file data.json --target my-index

Stream NDJSON/CSV via stdin

NDJSON

cat data.ndjson | {baseDir}/scripts/ingest.js --stdin --target my-index

CSV

cat data.csv | {baseDir}/scripts/ingest.js --stdin --source-format csv --target my-index

Ingest CSV directly

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --target users

Ingest Parquet directly

{baseDir}/scripts/ingest.js --file users.parquet --source-format parquet --target users

Ingest Arrow IPC directly

{baseDir}/scripts/ingest.js --file users.arrow --source-format arrow --target users

CSV/Parquet/Arrow support requires node-es-transformer >= 1.2.0.

Ingest CSV with parser options

csv-options.json

{

"columns": true,

"delimiter": ";",

"trim": true

}

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --csv-options csv-options.json --target users

Infer mappings/pipeline from CSV

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --target users

Infer mappings with options

infer-options.json

{

"sampleBytes": 200000,

"lines_to_sample": 2000

}

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --infer-mappings-options infer-options.json --target users

Ingest with custom mappings

{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json

Ingest with transformation

{baseDir}/scripts/ingest.js --file data.json --target my-index --transform transform.js

Reindex from another index

{baseDir}/scripts/ingest.js --source-index old-index --target new-index

Cross-cluster reindex (ES 8.x → 9.x)

{baseDir}/scripts/ingest.js --source-index logs
--node https://es8.example.com:9200 --api-key es8-key
--target new-logs
--target-node https://es9.example.com:9200 --target-api-key es9-key

Command Reference

Required Options

--target <index> # Target index name

Source Options (choose one)

--file <path> # Source file (supports wildcards, e.g., logs/*.json) --source-index <name> # Source Elasticsearch index --stdin # Read NDJSON/CSV from stdin

Elasticsearch Connection

--node <url> # ES node URL (default: http://localhost:9200) --api-key <key> # API key authentication --username <user> # Basic auth username --password <pass> # Basic auth password

Target Connection (for cross-cluster)

--target-node <url> # Target ES node URL (uses --node if not specified) --target-api-key <key> # Target API key --target-username <user> # Target username --target-password <pass> # Target password

Index Configuration

--mappings <file.json> # Mappings file (auto-copy from source if reindexing) --infer-mappings # Infer mappings/pipeline from file/stream --infer-mappings-options <file> # Options for inference (JSON file) --delete-index # Delete target index if exists --pipeline <name> # Ingest pipeline name

Processing

--transform <file.js> # Transform function (export as default or module.exports) --query <file.json> # Query file to filter source documents --source-format <fmt> # Source format: ndjson|csv|parquet|arrow (default: ndjson) --csv-options <file> # CSV parser options (JSON file) --skip-header # Skip first line (e.g., CSV header)

Performance

--buffer-size <kb> # Buffer size in KB (default: 5120) --search-size <n> # Docs per search when reindexing (default: 100) --total-docs <n> # Total docs for progress bar (file/stream) --stall-warn-seconds <n> # Stall warning threshold (default: 30) --progress-mode <mode> # Progress output: auto|line|newline (default: auto) --debug-events # Log pause/resume/stall events --quiet # Disable progress bars

Transform Functions

Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform function:

Basic Transform (transform.js)

// ES modules (default) export default function transform(doc) { return { ...doc, full_name: ${doc.first_name} ${doc.last_name}, timestamp: new Date().toISOString(), }; }

// Or CommonJS module.exports = function transform(doc) { return { ...doc, full_name: ${doc.first_name} ${doc.last_name}, }; };

Skip Documents

Return null or undefined to skip a document:

export default function transform(doc) { // Skip invalid documents if (!doc.email || !doc.email.includes('@')) { return null; } return doc; }

Split Documents

Return an array to create multiple target documents from one source:

export default function transform(doc) { // Split a tweet into multiple hashtag documents const hashtags = doc.text.match(/#\w+/g) || []; return hashtags.map(tag => ({ hashtag: tag, tweet_id: doc.id, created_at: doc.created_at, })); }

Mappings

Auto-Copy Mappings (Reindexing)

When reindexing, mappings are automatically copied from the source index:

{baseDir}/scripts/ingest.js --source-index old-logs --target new-logs

Custom Mappings (mappings.json)

{ "properties": { "@timestamp": { "type": "date" }, "message": { "type": "text" }, "user": { "properties": { "name": { "type": "keyword" }, "email": { "type": "keyword" } } } } }

{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json

Query Filters

Filter source documents during reindexing with a query file:

Query File (filter.json)

{ "range": { "@timestamp": { "gte": "2024-01-01", "lt": "2024-02-01" } } }

{baseDir}/scripts/ingest.js
--source-index logs
--target filtered-logs
--query filter.json

Common Patterns

Pattern 1: Load CSV with custom mappings

1. Create mappings.json with your schema

cat > mappings.json << 'EOF' { "properties": { "timestamp": { "type": "date" }, "user_id": { "type": "keyword" }, "action": { "type": "keyword" }, "value": { "type": "double" } } } EOF

2. Ingest CSV (skip header row)

{baseDir}/scripts/ingest.js
--file events.csv
--target events
--mappings mappings.json
--skip-header

Pattern 2: Migrate ES 8.x → 9.x with transforms

1. Create transform.js to update document structure

cat > transform.js << 'EOF' export default function transform(doc) { // Update field names for ES 9.x compatibility return { ...doc, '@timestamp': doc.timestamp, // Rename field user: { id: doc.user_id, name: doc.user_name, }, }; } EOF

2. Migrate with transform

{baseDir}/scripts/ingest.js
--source-index logs
--node https://es8-cluster.example.com:9200
--api-key $ES8_API_KEY
--target logs-v2
--target-node https://es9-cluster.example.com:9200
--target-api-key $ES9_API_KEY
--transform transform.js

Pattern 3: Reindex with filtering and deletion

1. Create filter query

cat > filter.json << 'EOF' { "bool": { "must": [ { "range": { "@timestamp": { "gte": "now-30d" } } } ], "must_not": [ { "term": { "status": "deleted" } } ] } } EOF

2. Reindex with filter (delete old index first)

{baseDir}/scripts/ingest.js
--source-index logs-raw
--target logs-filtered
--query filter.json
--delete-index

Pattern 4: Batch ingest multiple files

Ingest all JSON files in a directory

{baseDir}/scripts/ingest.js
--file "logs/*.json"
--target combined-logs
--mappings mappings.json

Pattern 5: Document enrichment during ingestion

1. Create enrichment transform

cat > enrich.js << 'EOF' export default function transform(doc) { return { ...doc, enriched_at: new Date().toISOString(), source: 'batch-import', year: new Date(doc.timestamp).getFullYear(), }; } EOF

2. Ingest with enrichment

{baseDir}/scripts/ingest.js
--file data.json
--target enriched-data
--transform enrich.js

Performance Tuning

For Large Files (>5GB)

Increase buffer size for better throughput

{baseDir}/scripts/ingest.js
--file huge-file.json
--target my-index
--buffer-size 10240 # 10 MB buffer

For Slow Networks

Reduce batch size to avoid timeouts

{baseDir}/scripts/ingest.js
--source-index remote-logs
--target local-logs
--search-size 50 # Smaller batches

Quiet Mode (for scripts)

Disable progress bars for automated scripts

{baseDir}/scripts/ingest.js
--file data.json
--target my-index
--quiet

When to Use

Use this skill when you need to:

  • Load large files into Elasticsearch without memory issues

  • Migrate indices between ES versions (8.x ↔ 9.x)

  • Transform data during ingestion (enrich, split, filter)

  • Reindex with modifications (rename fields, restructure documents)

  • Batch process multiple files matching a pattern

  • Cross-cluster replication with transformations

When NOT to Use

Consider alternatives for:

  • Real-time ingestion: Use Filebeat or Elastic Agent

  • Enterprise pipelines: Use Logstash

  • Built-in transforms: Use Elasticsearch Transforms

  • Small files (<100MB): Direct Elasticsearch API calls may be simpler

Troubleshooting

Connection refused

Elasticsearch is not running or URL is incorrect:

Test connection

curl http://localhost:9200

Or with auth

curl -H "Authorization: ApiKey $API_KEY" https://es.example.com:9200

Out of memory errors

Reduce buffer size:

{baseDir}/scripts/ingest.js --file data.json --target my-index --buffer-size 2048

Transform function not loading

Ensure the transform file exports correctly:

// ✓ Correct (ES modules) export default function transform(doc) { /* ... */ }

// ✓ Correct (CommonJS) module.exports = function transform(doc) { /* ... */ }

// ✗ Wrong function transform(doc) { /* ... */ }

Mapping conflicts

Delete and recreate the index:

{baseDir}/scripts/ingest.js
--file data.json
--target my-index
--mappings mappings.json
--delete-index

References

  • node-es-transformer GitHub

  • Elasticsearch Mappings

  • Elasticsearch Query DSL

  • Performance Tuning Guide

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

post-mortem

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

git-commit

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

Let's Clarify

Collect structured human input — approvals, decisions, reviews, data — via web forms. Create a form with a JSON schema, send unique URLs to humans, poll for...

Registry SourceRecently Updated
3772Profile unavailable
Automation

AgentPhone

Make real phone calls to businesses. Book reservations, cancel subscriptions, navigate IVR menus. Get transcripts and recordings.

Registry SourceRecently Updated
2851Profile unavailable