
Research
Malicious NuGet Packages Typosquat Nethereum to Exfiltrate Wallet Keys
The Socket Threat Research Team uncovered malicious NuGet packages typosquatting the popular Nethereum project to steal wallet keys.
@ovotech/castle-cli
Advanced tools
A command line wrapper around Kafka.js to transparently use Schema Registry for producing and consuming messages with Avro schema.
yarn global add @ovotech/castle-cli
castle --help
castle topic search my-topic
castle topic message my-topic --shema-file my-schema.json --message '{"field1":"value"}'
castle topic consume my-topic
If you want to connect to external kafka (and schema registry) servers you'll want to define some configuration files.
For example if we wanted to connect to:
registry.example.com
, port 2222
with username foo
and password bar
broker.example.com
, port 3333
, with specific tls keys and certsThe config would be a json file that looks like this
{
"schemaRegistry": {
"uri": "https://foo:bar@registry.example.com:2222"
},
"kafka": {
"brokers": ["broker.example.com:3333"],
"ssl": {
"ca": "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----\n",
"cert": "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----\n",
"key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
}
}
}
The "schemaRegistry" config object is the configuration you'll send to SchemaRegistry
from @ovotech/avro-kafkajs. Here are some docs for all the possible arguments https://github.com/ovotech/castle/tree/main/packages/avro-kafkajs#using-schema-registry-directly
The "kafka" config object is the config for kafkajs itself which you can read about in their excelent documentation https://kafka.js.org/docs/configuration
When you have a file like that saved somewhere you can use it with the --config
option
castle topic search my-topic --config ~/path/to/config.json
You can store the config inside ~/.castle-cli
folder which is also searched when you use the --config
option
castle topic search my-topic --config config.json
You can also define the configuration directly using the config commands
castle config set my-env \
--kafka-broker broker.example.com:3333 \
--key private.pem \
--ca ca.pem \
--cert cert.pem \
--schema-registry https://foo:bar@registry.example.com:2222`
castle topic search my-topic --config my-env
To consume and produce messages in kafka topics as well as inspect and modify the topics themselves, you can use the topic
group of commands.
You can see all of the available commands with help
.
Show partition, offsets and config entries of a topic.
castle topic show my-topic
castle topic show my-topic -vv
castle topic show my-topic --json
Options:
-J, --json
- output as json-C, --config <configFile>
- config file with connection deails-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-h, --help
- output usage informationUpdate config entries of a topic. All the available topic configurations can be found in the confluent documentation https://docs.confluent.io/current/installation/configuration/topic-configs.html
castle topic update my-topic --config-entry file.delete.delay.ms=40000
castle topic update my-topic --config-entry file.delete.delay.ms=40000 -vv
Get list of topics. If you don't specify a search string returns all of them.
Options:
-J, --json
- output as json-C, --config <configFile>
- config file with connection deails-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-h, --help
- output usage informationcastle topic search
castle topic search my-to
castle topic search my-to -vv
castle topic search my-topic --json
Create a topic. Can specify number of partitions, replaction factors and config entries.
Options:
-P, --num-partitions <partitions>
- number of partitions (default: 1)-R, --replication-factor <factor>
- replication Factor (default: 1)-E, --config-entry <entry>
- set a config entry, title=value, can use multiple times (default: [])-C, --config <config>
- config file with connection deails-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-h, --help
- output usage informationcastle topic create my-topic
castle topic create my-topic -vvvv
castle topic create my-topic --num-partitions 2 --replication-factor 2 --config-entry file.delete.delay.ms=40000
Consume messages of a topic. Use schema registry to decode avro messages. By default would use a new random consumer group id to retrieve all the messages and exit.
Using the --json
option you will output the result as json, that can be then be used by "castle produce" command
Options:
-D, --depth <depth>
- depth for the schemas output (default: 5)-G, --group-id <groupId>
- consumer group id, defaults to random uuid (default: "random group name")-T, --tail
- start listening for new events-J, --json
- output as json-K, --encoded-key
- Decode the key with avro schema too-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-C, --config <configFile>
- config file with connection deails-h, --help
- output usage informationcastle topic consume my-topic
castle topic consume my-topic -vvvv
castle topic consume my-topic --tail
castle topic consume my-topic --group-id my-group-id
castle topic consume my-topic --json
Produce messages for a topic. Using a file that contains schema, topic and messages to be produced. Schemas for keys are supported with the "keySchema" field in the produce file.
Options:
-C, --config <configFile>
- config file with connection deails-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-h, --help
- output usage informationcastle topic produce my-produce-file.json
castle topic produce my-produce-file.json -vv
Example produce file:
{
"topic": "my-topic",
"schema": {
"name": "Event",
"type": "record",
"fields": [{ "name": "field1", "type": "string" }]
},
"messages": [{ "partition": 0, "value": { "field1": "test1" } }]
}
Produce an ad-hoc message for a topic. You need to specify schema file (with --schema-file) and message content as json (--message). If you define --key-schema-file as well you can encode your keys too.
Options:
-P, --partition <partition>
- the partion to send this on-K, --key <key>
- message key-M, --message <message>
- the JSON message to be sent-S, --schema-file <schema>
- path to the schema file-E, --key-schema-file <schema>
- optional path to the key schema file-C, --config <config>
- config file with connection deails-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-h, --help
- output usage informationcastle topic message my-topic --schema my-schema.json --message '{"text":"other"}'
castle topic message my-topic --schema my-schema.json --message '{"text":"other"}' -vvvv
To search for schemas in the schema registry you can use the schema
group of commands.
You can see all of the available commands with help
.
Search for schemas with the given name in the schema registry. If you don't specify a search string returns all of them.
Options:
-J, --json
- output as json-C, --config <configFile>
- config file with connection deails-h, --help
- output usage informationcastle schema search
castle schema search my-to
castle schema search my-topic --json
Show all the versions of a schema in the schema registry.
Options:
-D, --depth <depth>
- depth for the schemas output (default: 5)-J, --json
- output as json-C, --config <configFile>
- config file with connection deails-h, --help
- output usage informationcastle schema show my-topic --depth 7
castle schema show my-topic --json
If you want to modify consumer groups and their offsets you can do that with the group
commands
Show consumer group offsets for a topic. Break it down by partition and calculate current lag (difference between current and latest offset)
Options:
-J, --json
- output as json-C, --config <configFile>
- config file with connection deails-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-h, --help
- output usage informationcastle group show my-group-id my-topic
castle group show my-group-id my-topic -vv
castle group show my-group-id my-topic --json
Update consumer group offsets for a topic. Requires either --reset-offsets or --set-offset options.
Options:
-R, --reset-offsets <earliest|latest>
- reset consumer group offset-E, --set-offset <entry>
- set an offset, partition=offset, can use multiple times (default: [])-C, --config <configFile>
- config file with connection deails-v, --verbose
- Output logs for kafka, four levels: error, warn, info, debug. use flag multiple times to increase level-h, --help
- output usage informationcastle group update my-group-id my-topic --reset-offsets earliest
castle group update my-group-id my-topic --reset-offsets earliest -vv
castle group update my-group-id my-topic --set-offset 0=10 --set-offset 1=10
You can run the tests with:
yarn test
Style is maintained with prettier and eslint
yarn lint
Deployment is preferment by lerna automatically on merge / push to main, but you'll need to bump the package version numbers yourself. Only updated packages with newer versions will be pushed to the npm registry.
Have a bug? File an issue with a simple example that reproduces this so we can take a look & confirm.
Want to make a change? Submit a PR, explain why it's useful, and make sure you've updated the docs (this file) and the tests (see test folder).
This project is licensed under Apache 2 - see the LICENSE file for details
FAQs
A kafka avro cli
We found that @ovotech/castle-cli demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 270 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
The Socket Threat Research Team uncovered malicious NuGet packages typosquatting the popular Nethereum project to steal wallet keys.
Product
A single platform for static analysis, secrets detection, container scanning, and CVE checks—built on trusted open source tools, ready to run out of the box.
Product
Socket is launching experimental protection for the Hugging Face ecosystem, scanning for malware and malicious payload injections inside model files to prevent silent AI supply chain attacks.