tableschema-elasticsearch-py

Generate and load ElasticSearch indexes based on Table Schema descriptors.
Features
- implements
tableschema.Storage
interface
Contents
Getting Started
Installation
The package use semantic versioning. It means that major versions could include breaking changes. It's highly recommended to specify package
version range in your setup/requirements
file e.g. package>=1.0,<2.0
.
pip install tableschema-elasticsearch
Documentation
Usage overview
import elasticsearch
import jsontableschema_es
INDEX_NAME = 'testing_index'
es=elasticsearch.Elasticsearch()
storage=jsontableschema_es.Storage(es)
print(list(storage.buckets))
storage.create('test', {
'fields': [
{
'name': 'num',
'type': 'number'
}
]
}
)
l=list(storage.write(INDEX_NAME, ({'num':i} for i in range(1000)), ['num']))
print(len(l))
print(l[:10], '...')
l=list(storage.write(INDEX_NAME, ({'num':i} for i in range(500,1500)), ['num']))
print(len(l))
print(l[:10], '...')
storage=jsontableschema_es.Storage(es)
print(list(storage.buckets))
l=list(storage.read(INDEX_NAME))
print(len(l))
print(l[:10])
In this driver elasticsearch
is used as the db wrapper. We can get storage this way:
from elasticsearch import Elasticsearch
from jsontableschema_elasticsearch import Storage
engine = Elasticsearch()
storage = Storage(engine)
Then we could interact with storage ('buckets' are ElasticSearch indexes in this context):
storage.buckets
storage.create('bucket', descriptor,
reindex=False,
always_recreate=False,
mapping_generator_cls=None)
storage.delete('bucket')
storage.describe('bucket')
storage.iter('bucket')
storage.read('bucket')
storage.write('bucket', rows, primary_key,
as_generator=False)
When creating indexes, we always create an index with a semi-random name and a matching alias that points to it. This allows us to decide whether to re-index documents whenever we're re-creating an index, or to discard the existing records.
Mappings
When creating indexes, the tableschema types are converted to ES types and a mapping is generated for the index.
Some special properties in the schema provide extra information for generating the mapping:
array
types need also to have the es:itemType
property which specifies the inner data type of array items.
object
types need also to have the es:schema
property which provides a tableschema for the inner document contained in that object (or have es:enabled=false
to disable indexing of that field).
Example:
{
"fields": [
{
"name": "my-number",
"type": "number"
},
{
"name": "my-array-of-dates",
"type": "array",
"es:itemType": "date"
},
{
"name": "my-person-object",
"type": "object",
"es:schema": {
"fields": [
{"name": "name", "type": "string"},
{"name": "surname", "type": "string"},
{"name": "age", "type": "integer"},
{"name": "date-of-birth", "type": "date", "format": "%Y-%m-%d"}
]
}
},
{
"name": "my-library",
"type": "array",
"es:itemType": "object",
"es:schema": {
"fields": [
{"name": "title", "type": "string"},
{"name": "isbn", "type": "string"},
{"name": "num-of-pages", "type": "integer"}
]
}
},
{
"name": "my-user-provded-object",
"type": "object",
"es:enabled": false
}
]
}
Custom mappings
By providing a custom mapping generator class (via mapping_generator_cls
), inheriting from the MappingGenerator class you should be able
API Reference
Storage
Storage(self, es=None)
Elasticsearch Tabular Storage.
Package implements
Tabular Storage
interface (see full documentation on the link):

Only additional API is documented
Arguments
- es (object): ElasticSearch instance
storage.create
storage.create(self, bucket, descriptor, reindex=False, always_recreate=False, mapping_generator_cls=None, index_settings=None)
Create index with mapping by schema.
Arguments
- bucket(str):
Name of index to be created
- descriptor:
dDscriptor of index to be created
- always_recreate:
Delete index if already exists (otherwise just update mapping)
- reindex:
On mapping mismath, automatically create
new index and migrate existing indexes to it
- mapping_generator_cls:
subclass of MappingGenerator
- index_settings:
settings which will be used in index creation
storage.delete
storage.delete(self, bucket=None)
Delete index with mapping by schema.
Arguments
- bucket(str): Name of index to delete
Contributing
The project follows the Open Knowledge International coding standards.
Recommended way to get started is to create and activate a project virtual environment.
To install package and development dependencies into active environment:
$ make install
To run tests with linting and coverage:
$ make test
Changelog
Here described only breaking and the most important changes. The full changelog and documentation for all released versions could be found in nicely formatted commit history.
v1.0
- Initial driver implementation