Research
Security News
Kill Switch Hidden in npm Packages Typosquatting Chalk and Chokidar
Socket researchers found several malicious npm packages typosquatting Chalk and Chokidar, targeting Node.js developers with kill switches and data theft.
Ray data source and sink for Elasticsearch.
Use this minimal library if you plan to read or write data from/to Elasticsearch massively parallel for data processing in Ray. Internally, the library uses parallelized sliced point-in-time search for reading and parallelized bulk requests for writing data, the two most efficient ways to read/write data to/from Elasticsearch. Note, that this library does not guarantee any specific ordering of the results, though, the scores are returned.
Install the package from PyPI:
pip install ray-elasticsearch
This library makes use of Ray's Datasource
and Datasink
APIs.
For reading, use ElasticsearchDatasource
and, for writing, use ElasticsearchDatasink
.
You can read results from a specified index by using an ElasticsearchDatasource
with Ray's read_datasource()
. Considering you have an index named test
that stores some numeric value in the value
field, you can efficiently compute the sum of all values like so:
from ray import init
from ray.data import read_datasource
from ray_elasticsearch import ElasticsearchDatasource
init()
source = ElasticsearchDatasource(index="test")
res = read_datasource(source)\
.sum("value")
print(f"Read complete. Sum: {res}")
Use an Elasticsearch query to filter the results:
source = ElasticsearchDatasource(
index="test",
query={
"match": {
"text": "foo bar",
},
},
)
Note that the parallel read does not enforce any ordering of the results even though the results are scored by Elasticsearch.
With the default settings, you can still access the retrieved score from the Ray Dataset
's _score
column.
You do not need to set a fixed maximum concurrency level. But it can often be a good idea to limit concurrency (and hence, simultaneous requests to the Elasticsearch cluster) by setting the concurrency
parameter in Ray's read_datasource()
:
source = ElasticsearchDatasource(index="test")
ds = read_datasource(source, concurrency=100)\
Writing documents works similarly by using the ElasticsearchDatasink
with Ray's write_datasink()
:
from ray import init
from ray.data import range
from ray_elasticsearch import ElasticsearchDatasink
init()
sink = ElasticsearchDatasink(index="test")
range(10_000) \
.rename_columns({"id": "value"}) \
.write_datasink(sink)
print("Write complete.")
Concurrency can again be limited by specifying the concurrency
parameter in Ray's write_datasink()
.
Per default, the data source and sink access Elasticsearch on localhost:9200
, the default of the elasticsearch
Python library.
However, in most cases, you would instead want to continue to some remote Elasticsearch instance.
To do so, specify the client like in the example below, and use the same parameters as in the Elasticsearch()
constructor:
source = ElasticsearchDatasource(
index="test",
hosts="<HOST>",
http_auth=("<USERNAME>", "<PASSWORD>"),
max_retries=10,
)
All client related keyword arguments to the ElasticsearchDatasource
or ElasticsearchDatasink
are passed on to the Elasticsearch()
constructor. Refer to the documentation for an overview of the supported connection settings.
To simplify query construction, you can also use the Elasticsearch DSL like this:
from elasticsearch_dsl import Document
from elasticsearch_dsl.query import Exists
from ray_elasticsearch import ElasticsearchDatasource, ElasticsearchDatasink
class Foo(Document):
class Index:
name = "test_foo"
text: str = Text()
source = ElasticsearchDatasource(
index=Foo,
query=Exists(field="doi"),
)
sink = ElasticsearchDslDatasink(index=Foo)
Note that, unlike in Elasticsearch DSL, the results are not parsed as Python objects but instead are returned as columns of the Ray Dataset
(which internally uses the Arrow format).
Any document returned from or to-be-stored in Elasticsearch consists of the actual data nested in the _source
field, and some metadata (e.g., _id
and _index
) on the top level. However, working with nested columns can sometimes be tricky with Ray (e.g., nested columns cannot be renamed easily). Because you are likely most interested in the contents of the _source
field, i.e., the indexed fields of the Elasticsearch index, the ray-elasticsearch
library automatically unwraps the source
field. For example, consider the following Elasticsearch record:
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_score" : null,
"_source" : {
"value" : 1
}
}
Using the default settings, the corresponding row in the Ray dataset will look like this:
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_score" : None,
"value" : 1
}
You can also select the source and metadata fields explicitly, using the source_fields
and meta_fields
arguments:
source = ElasticsearchDatasource(
index="test",
source_fields=["value"],
meta_fields=["id"],
)
With the above setting, just the ID and value will be stored in the Ray Dataset
's blocks:
{
"_id" : "1",
"value" : 1
}
The metadata field prefix can be changed with the meta_prefix
argument (the default is an underscore, _
, just like with Elasticsearch).
More examples can be found in the examples
directory.
This library works fine with any of the following Pip packages installed:
elasticsearch
elasticsearch7
elasticsearch8
elasticsearch-dsl
elasticsearch7-dsl
elasticsearch8-dsl
The ray-elasticsearch
library will automatically detect if the Elasticsearch DSL is installed, and add support for DSL-style queries accordingly.
To build this package and contribute to its development you need to install the build
, setuptools
and wheel
packages:
pip install build setuptools wheel
(On most systems, these packages are already pre-installed.)
Install package and test dependencies:
pip install -e .[tests]
Verify your changes against the test suite to verify.
ruff check . # Code format and LINT
mypy . # Static typing
bandit -c pyproject.toml -r . # Security
pytest . # Unit tests
Please also add tests for your newly developed code.
Wheels for this package can be built with:
python -m build
If you have any problems using this package, please file an issue. We're happy to help!
This repository is released under the MIT license.
FAQs
Unified, type-safe access to web archive APIs.
We found that ray-elasticsearch demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers found several malicious npm packages typosquatting Chalk and Chokidar, targeting Node.js developers with kill switches and data theft.
Security News
pnpm 10 blocks lifecycle scripts by default to improve security, addressing supply chain attack risks but sparking debate over compatibility and workflow changes.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.