- Elasticsearch File Ingest
- Stream-based ingestion and transformation of large data files (NDJSON, CSV, Parquet, Arrow IPC) into Elasticsearch.
- Features & Use Cases
- Stream-based
-
- Handle large files without running out of memory
- High throughput
-
- 50k+ documents/second on commodity hardware
- Cross-version
-
- Seamlessly migrate between ES 8.x and 9.x, or replicate across clusters
- Formats
-
- NDJSON, CSV, Parquet, Arrow IPC
- Transformations
-
- Apply custom JavaScript transforms during ingestion (enrich, split, filter)
- Reindexing
-
- Copy and transform existing indices (rename fields, restructure documents)
- Batch processing
-
- Ingest multiple files matching a pattern (e.g.,
- logs/*.json
- )
- Document splitting
- Transform one source document into multiple targets Prerequisites Elasticsearch 8.x or 9.x accessible (local or remote) Node.js 22+ installed Setup This skill is self-contained. The scripts/ folder and package.json live in this skill's directory. Run all commands from this directory. Use absolute paths when referencing data files located elsewhere. Before first use, install dependencies: npm install Environment Configuration Elasticsearch connection is configured via environment variables. The CLI flags --node , --api-key , --username , and --password override environment variables when provided. Option 1: Elastic Cloud (recommended for production) export ELASTICSEARCH_CLOUD_ID = "deployment-name:base64encodedcloudid" export ELASTICSEARCH_API_KEY = "base64encodedapikey" Option 2: Direct URL with API Key export ELASTICSEARCH_URL = "https://elasticsearch:9200" export ELASTICSEARCH_API_KEY = "base64encodedapikey" Option 3: Basic Authentication export ELASTICSEARCH_URL = "https://elasticsearch:9200" export ELASTICSEARCH_USERNAME = "elastic" export ELASTICSEARCH_PASSWORD = "changeme" Option 4: Local Development with start-local For local development and testing, use start-local to quickly spin up Elasticsearch and Kibana using Docker or Podman: curl -fsSL https://elastic.co/start-local | sh After installation completes, source the generated .env file: source elastic-start-local/.env export ELASTICSEARCH_URL = " $ES_LOCAL_URL " export ELASTICSEARCH_API_KEY = " $ES_LOCAL_API_KEY " Optional: Skip TLS verification (development only) export ELASTICSEARCH_INSECURE = "true" Examples Ingest a JSON file node scripts/ingest.js --file /absolute/path/to/data.json --target my-index Stream NDJSON/CSV via stdin
NDJSON
cat /absolute/path/to/data.ndjson | node scripts/ingest.js --stdin --target my-index
CSV
cat /absolute/path/to/data.csv | node scripts/ingest.js --stdin --source-format csv --target my-index Ingest CSV directly node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --target users Ingest Parquet directly node scripts/ingest.js --file /absolute/path/to/users.parquet --source-format parquet --target users Ingest Arrow IPC directly node scripts/ingest.js --file /absolute/path/to/users.arrow --source-format arrow --target users Ingest CSV with parser options
csv-options.json
{
"columns": true,
"delimiter": ";",
"trim": true
}
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --csv-options csv-options.json --target users Infer mappings/pipeline from CSV When using --infer-mappings , do not combine it with --source-format csv . Inference sends a raw sample to Elasticsearch's _text_structure/find_structure endpoint, which returns both mappings and an ingest pipeline with a CSV processor. If --source-format csv is also set, CSV is parsed client-side and server-side, resulting in an empty index. Let --infer-mappings handle everything: node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --target users Infer mappings with options
infer-options.json
{
"sampleBytes": 200000,
"lines_to_sample": 2000
}
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --infer-mappings-options infer-options.json --target users Ingest with custom mappings node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json Ingest with transformation node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --transform transform.js Reindex from another index node scripts/ingest.js --source-index old-index --target new-index Cross-cluster reindex (ES 8.x → 9.x) node scripts/ingest.js --source-index logs \ --node https://es8.example.com:9200 --api-key es8-key \ --target new-logs \ --target-node https://es9.example.com:9200 --target-api-key es9-key Command Reference Required Options --target < index
Target index name
Source Options (choose one) --file < path
Source file (supports wildcards, e.g., logs/*.json)
--source-index < name
Source Elasticsearch index
--stdin
Read NDJSON/CSV from stdin
Elasticsearch Connection --node < url
ES node URL (default: http://localhost:9200)
--api-key < key
API key authentication
--username < user
Basic auth username
--password < pass
Basic auth password
Target Connection (for cross-cluster) --target-node < url
Target ES node URL (uses --node if not specified)
--target-api-key < key
Target API key
--target-username < user
Target username
--target-password < pass
Target password
Index Configuration --mappings < file.json
Mappings file (auto-copy from source if reindexing)
--infer-mappings
Infer mappings/pipeline from file/stream (do NOT combine with --source-format)
--infer-mappings-options < file
Options for inference (JSON file)
--delete-index
Delete target index if exists
--pipeline < name
Ingest pipeline name
Processing --transform < file.js
Transform function (export as default or module.exports)
--query < file.json
Query file to filter source documents
--source-format < fmt
Source format: ndjson|csv|parquet|arrow (default: ndjson)
--csv-options < file
CSV parser options (JSON file)
--skip-header
Skip first line (e.g., CSV header)
Performance --buffer-size < kb
Buffer size in KB (default: 5120)
--search-size < n
Docs per search when reindexing (default: 100)
--total-docs < n
Total docs for progress bar (file/stream)
--stall-warn-seconds < n
Stall warning threshold (default: 30)
--progress-mode < mode
Progress output: auto|line|newline (default: auto)
--debug-events
Log pause/resume/stall events
--quiet
Disable progress bars
Transform Functions
Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform
function:
Basic Transform (transform.js)
// ES modules (default)
export
default
function
transform
(
doc
)
{
return
{
...
doc
,
full_name
:
${
doc
.
first_name
}
${
doc
.
last_name
}
,
timestamp
:
new
Date
(
)
.
toISOString
(
)
,
}
;
}
// Or CommonJS
module
.
exports
=
function
transform
(
doc
)
{
return
{
...
doc
,
full_name
:
${
doc
.
first_name
}
${
doc
.
last_name
}
,
}
;
}
;
Skip Documents
Return
null
or
undefined
to skip a document:
export
default
function
transform
(
doc
)
{
// Skip invalid documents
if
(
!
doc
.
email
||
!
doc
.
email
.
includes
(
"@"
)
)
{
return
null
;
}
return
doc
;
}
Split Documents
Return an array to create multiple target documents from one source:
export
default
function
transform
(
doc
)
{
// Split a tweet into multiple hashtag documents
const
hashtags
=
doc
.
text
.
match
(
/
- \w
- +
- /
- g
- )
- ||
- [
- ]
- ;
- return
- hashtags
- .
- map
- (
- (
- tag
- )
- =>
- (
- {
- hashtag
- :
- tag
- ,
- tweet_id
- :
- doc
- .
- id
- ,
- created_at
- :
- doc
- .
- created_at
- ,
- }
- )
- )
- ;
- }
- Mappings
- Auto-Copy Mappings (Reindexing)
- When reindexing, mappings are automatically copied from the source index:
- node
- scripts/ingest.js --source-index old-logs
- --target
- new-logs
- Custom Mappings (mappings.json)
- {
- "properties"
- :
- {
- "@timestamp"
- :
- {
- "type"
- :
- "date"
- }
- ,
- "message"
- :
- {
- "type"
- :
- "text"
- }
- ,
- "user"
- :
- {
- "properties"
- :
- {
- "name"
- :
- {
- "type"
- :
- "keyword"
- }
- ,
- "email"
- :
- {
- "type"
- :
- "keyword"
- }
- }
- }
- }
- }
- node
- scripts/ingest.js
- --file
- /absolute/path/to/data.json
- --target
- my-index
- --mappings
- mappings.json
- Query Filters
- Filter source documents during reindexing with a query file:
- Query File (filter.json)
- {
- "range"
- :
- {
- "@timestamp"
- :
- {
- "gte"
- :
- "2024-01-01"
- ,
- "lt"
- :
- "2024-02-01"
- }
- }
- }
- node
- scripts/ingest.js
- \
- --source-index logs
- \
- --target
- filtered-logs
- \
- --query
- filter.json
- Boundaries
- Never
- run destructive commands (such as using the
- --delete-index
- flag or deleting existing indices and data)
- without explicit user confirmation.
- Guidelines
- Never combine
- --infer-mappings
- with
- --source-format
- . Inference creates a server-side ingest pipeline that
- handles parsing (e.g., CSV processor). Using
- --source-format csv
- parses client-side as well, causing double-parsing
- and an empty index. Use
- --infer-mappings
- alone for automatic detection, or
- --source-format
- with explicit
- --mappings
- for manual control.
- Use
- --source-format csv
- with
- --mappings
- when you want client-side CSV parsing with known field types.
- Use
- --infer-mappings
- alone
- when you want Elasticsearch to detect the format, infer field types, and create an
- ingest pipeline automatically.
- When NOT to Use
- Consider alternatives for:
- Real-time ingestion
-
- Use
- Filebeat
- or
- Elastic Agent
- Enterprise pipelines
-
- Use
- Logstash
- Built-in transforms
- Use Elasticsearch Transforms Additional Resources Common Patterns - Detailed examples for CSV loading, migrations, filtering, and more Troubleshooting - Solutions for common issues References Elasticsearch Mappings Elasticsearch Query DSL