elasticsearch-file-ingest

安装量: 99
排名: #8357

安装

npx skills add https://github.com/elastic/agent-skills --skill elasticsearch-file-ingest
Elasticsearch File Ingest
Stream-based ingestion and transformation of large data files (NDJSON, CSV, Parquet, Arrow IPC) into Elasticsearch.
Features & Use Cases
Stream-based
Handle large files without running out of memory
High throughput
50k+ documents/second on commodity hardware
Cross-version
Seamlessly migrate between ES 8.x and 9.x, or replicate across clusters
Formats
NDJSON, CSV, Parquet, Arrow IPC
Transformations
Apply custom JavaScript transforms during ingestion (enrich, split, filter)
Reindexing
Copy and transform existing indices (rename fields, restructure documents)
Batch processing
Ingest multiple files matching a pattern (e.g.,
logs/*.json
)
Document splitting
Transform one source document into multiple targets Prerequisites Elasticsearch 8.x or 9.x accessible (local or remote) Node.js 22+ installed Setup This skill is self-contained. The scripts/ folder and package.json live in this skill's directory. Run all commands from this directory. Use absolute paths when referencing data files located elsewhere. Before first use, install dependencies: npm install Environment Configuration Elasticsearch connection is configured via environment variables. The CLI flags --node , --api-key , --username , and --password override environment variables when provided. Option 1: Elastic Cloud (recommended for production) export ELASTICSEARCH_CLOUD_ID = "deployment-name:base64encodedcloudid" export ELASTICSEARCH_API_KEY = "base64encodedapikey" Option 2: Direct URL with API Key export ELASTICSEARCH_URL = "https://elasticsearch:9200" export ELASTICSEARCH_API_KEY = "base64encodedapikey" Option 3: Basic Authentication export ELASTICSEARCH_URL = "https://elasticsearch:9200" export ELASTICSEARCH_USERNAME = "elastic" export ELASTICSEARCH_PASSWORD = "changeme" Option 4: Local Development with start-local For local development and testing, use start-local to quickly spin up Elasticsearch and Kibana using Docker or Podman: curl -fsSL https://elastic.co/start-local | sh After installation completes, source the generated .env file: source elastic-start-local/.env export ELASTICSEARCH_URL = " $ES_LOCAL_URL " export ELASTICSEARCH_API_KEY = " $ES_LOCAL_API_KEY " Optional: Skip TLS verification (development only) export ELASTICSEARCH_INSECURE = "true" Examples Ingest a JSON file node scripts/ingest.js --file /absolute/path/to/data.json --target my-index Stream NDJSON/CSV via stdin

NDJSON

cat /absolute/path/to/data.ndjson | node scripts/ingest.js --stdin --target my-index

CSV

cat /absolute/path/to/data.csv | node scripts/ingest.js --stdin --source-format csv --target my-index Ingest CSV directly node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --target users Ingest Parquet directly node scripts/ingest.js --file /absolute/path/to/users.parquet --source-format parquet --target users Ingest Arrow IPC directly node scripts/ingest.js --file /absolute/path/to/users.arrow --source-format arrow --target users Ingest CSV with parser options

csv-options.json

{

"columns": true,

"delimiter": ";",

"trim": true

}

node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --csv-options csv-options.json --target users Infer mappings/pipeline from CSV When using --infer-mappings , do not combine it with --source-format csv . Inference sends a raw sample to Elasticsearch's _text_structure/find_structure endpoint, which returns both mappings and an ingest pipeline with a CSV processor. If --source-format csv is also set, CSV is parsed client-side and server-side, resulting in an empty index. Let --infer-mappings handle everything: node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --target users Infer mappings with options

infer-options.json

{

"sampleBytes": 200000,

"lines_to_sample": 2000

}

node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --infer-mappings-options infer-options.json --target users Ingest with custom mappings node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json Ingest with transformation node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --transform transform.js Reindex from another index node scripts/ingest.js --source-index old-index --target new-index Cross-cluster reindex (ES 8.x → 9.x) node scripts/ingest.js --source-index logs \ --node https://es8.example.com:9200 --api-key es8-key \ --target new-logs \ --target-node https://es9.example.com:9200 --target-api-key es9-key Command Reference Required Options --target < index

Target index name

Source Options (choose one) --file < path

Source file (supports wildcards, e.g., logs/*.json)

--source-index < name

Source Elasticsearch index

--stdin

Read NDJSON/CSV from stdin

Elasticsearch Connection --node < url

ES node URL (default: http://localhost:9200)

--api-key < key

API key authentication

--username < user

Basic auth username

--password < pass

Basic auth password

Target Connection (for cross-cluster) --target-node < url

Target ES node URL (uses --node if not specified)

--target-api-key < key

Target API key

--target-username < user

Target username

--target-password < pass

Target password

Index Configuration --mappings < file.json

Mappings file (auto-copy from source if reindexing)

--infer-mappings

Infer mappings/pipeline from file/stream (do NOT combine with --source-format)

--infer-mappings-options < file

Options for inference (JSON file)

--delete-index

Delete target index if exists

--pipeline < name

Ingest pipeline name

Processing --transform < file.js

Transform function (export as default or module.exports)

--query < file.json

Query file to filter source documents

--source-format < fmt

Source format: ndjson|csv|parquet|arrow (default: ndjson)

--csv-options < file

CSV parser options (JSON file)

--skip-header

Skip first line (e.g., CSV header)

Performance --buffer-size < kb

Buffer size in KB (default: 5120)

--search-size < n

Docs per search when reindexing (default: 100)

--total-docs < n

Total docs for progress bar (file/stream)

--stall-warn-seconds < n

Stall warning threshold (default: 30)

--progress-mode < mode

Progress output: auto|line|newline (default: auto)

--debug-events

Log pause/resume/stall events

--quiet

Disable progress bars

Transform Functions Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform function: Basic Transform (transform.js) // ES modules (default) export default function transform ( doc ) { return { ... doc , full_name : ${ doc . first_name } ${ doc . last_name } , timestamp : new Date ( ) . toISOString ( ) , } ; } // Or CommonJS module . exports = function transform ( doc ) { return { ... doc , full_name : ${ doc . first_name } ${ doc . last_name } , } ; } ; Skip Documents Return null or undefined to skip a document: export default function transform ( doc ) { // Skip invalid documents if ( ! doc . email || ! doc . email . includes ( "@" ) ) { return null ; } return doc ; } Split Documents Return an array to create multiple target documents from one source: export default function transform ( doc ) { // Split a tweet into multiple hashtag documents const hashtags = doc . text . match ( /

\w
+
/
g
)
||
[
]
;
return
hashtags
.
map
(
(
tag
)
=>
(
{
hashtag
:
tag
,
tweet_id
:
doc
.
id
,
created_at
:
doc
.
created_at
,
}
)
)
;
}
Mappings
Auto-Copy Mappings (Reindexing)
When reindexing, mappings are automatically copied from the source index:
node
scripts/ingest.js --source-index old-logs
--target
new-logs
Custom Mappings (mappings.json)
{
"properties"
:
{
"@timestamp"
:
{
"type"
:
"date"
}
,
"message"
:
{
"type"
:
"text"
}
,
"user"
:
{
"properties"
:
{
"name"
:
{
"type"
:
"keyword"
}
,
"email"
:
{
"type"
:
"keyword"
}
}
}
}
}
node
scripts/ingest.js
--file
/absolute/path/to/data.json
--target
my-index
--mappings
mappings.json
Query Filters
Filter source documents during reindexing with a query file:
Query File (filter.json)
{
"range"
:
{
"@timestamp"
:
{
"gte"
:
"2024-01-01"
,
"lt"
:
"2024-02-01"
}
}
}
node
scripts/ingest.js
\
--source-index logs
\
--target
filtered-logs
\
--query
filter.json
Boundaries
Never
run destructive commands (such as using the
--delete-index
flag or deleting existing indices and data)
without explicit user confirmation.
Guidelines
Never combine
--infer-mappings
with
--source-format
. Inference creates a server-side ingest pipeline that
handles parsing (e.g., CSV processor). Using
--source-format csv
parses client-side as well, causing double-parsing
and an empty index. Use
--infer-mappings
alone for automatic detection, or
--source-format
with explicit
--mappings
for manual control.
Use
--source-format csv
with
--mappings
when you want client-side CSV parsing with known field types.
Use
--infer-mappings
alone
when you want Elasticsearch to detect the format, infer field types, and create an
ingest pipeline automatically.
When NOT to Use
Consider alternatives for:
Real-time ingestion
Use
Filebeat
or
Elastic Agent
Enterprise pipelines
Use
Logstash
Built-in transforms
Use Elasticsearch Transforms Additional Resources Common Patterns - Detailed examples for CSV loading, migrations, filtering, and more Troubleshooting - Solutions for common issues References Elasticsearch Mappings Elasticsearch Query DSL
返回排行榜