Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

dvgroup-factory

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dvgroup-factory

Service factory for DVGroup

  • 0.0.60
  • PyPI
  • Socket score

Maintainers
1

#Состав библиотеки

  1. Класс Factory для получения экземпляров сервисов без указания путей к сервисам и секретов. Для работы нужно сконфигурировать переменные окружения VAULT_URL и VAULT_TOKEN, или явно передать url и token при создании экземпляра класса). Далее секреты автоматически достаются из vault, пути из consul
  2. Декоратор @log(logger=you_logger)
  3. Декоратор @retry(count=10, sleep=0.5)

Установка


Requests is available on PyPI:
$ python -m pip install dvgroup_factory
from dvgroup_factory import factory

##Порядок работы:

  1. Получить объект фабрики:
    1. fc = factory.Factory(vault_url=url, vault_token=token)
    2. 2.1 fc = factory.Factory(), если определены переменные окружения VAULT_URL и VAULT_TOKEN
  2. Получить объект сервиса (в kwargs передаются параметры не связанные с url и secrets):
    1. ch_client = fc.clickhouse_client(secure=True, database="db1", verify=False)
    2. kafka_p = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
  3. По умолчанию, если ранее уже был создан объект сервиса, то при следующем запросе, будет возвращен ранее созданный.
  4. Для получения нового объекта (если ранее уже был получен экземпляр), требуется переддать параметр new=True:
    1. kafka_p2 = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), new=True)
  5. Для получения именованного экземпляра требуется указать параметр instance_name
    1. kafka_p2 = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), instance_name="Name")
  6. Для понимания, какие методы (классы сервисов) реализованы, следует вызвать метод info(), который возвратит след.информацию:

Создание экземпляра:\

ins = Factory(vault_url=url, vault_token=token)\

Методы:
1 ins.vault_client(url: str, token: str)
2 ins.consul(**kwargs)
3 ins.kafka_producer(**kwargs)
4 ins.kafka_consumer(*topics,**kwargs)
3 ins.aiokafka_producer(**kwargs)
4 ins.aiokafka_consumer(*topics,**kwargs)
5 ins.clickhouse_client(**kwargs)
6 ins.azure_container_client(**kwargs)
7 ins.loki_handler(**kwargs)
8 ins.gp_connection()\ Для создания нового экземпляра укажите в kwargs: new=True
Пути настроек в consul:
{"clickhouse": "env/databases/clickhouse", "kafka": "env/databases/kafka", "ms-azure-se": "env/databases/ms-azure-se", "loki": "env/databases/loki"}

#Пример кода:

from dvgrop_factory import factory as fc

###Получаю экземпляр фабрики factory = fc.Factory()

###Consul consul = factory.consul()
kafka_config = consul.kv["env/databases/kafka"]

###Clickhouse ch = factory.clickhouse_client(database="db1", ca_certs="CA.pem")
rs = ch.execute("SELECT COUNT(*) FROM db1.atol")

###KafkaProducer k_p = factory.kafka_producer()

###KafkaConsumer k_c = factory.kafka_consumer()

###azure.storage.blob.ContainerClient a_cc = factory.azure_container_client(container_name="output")
print(f'k_c = {a_cc}')

###azure.storage.blob.BlobCliennt a_cc = factory.azure_blob_client(container_name="output", blob_name = "nm")

###logging_loki.LokiHandler loki = factory.loki_handler(tags={"application": "atol-connector"}, version="1")
loki.setLevel(logging.DEBUG)
_log_format = f"%(asctime)s - [%(levelname)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s"
loki.setFormatter(logging.Formatter(_log_format))
logger = logging.getLogger('segments-api')
logger.addHandler(loki)

###GreenPlum Connection conn = factory.gp_connection() cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM raw_atol') rs = cursor.fetchone()

###AIOKafka async def aiostart():
consumer = factory.aiokafka_consumer("test-atol1", auto_offset_reset='earliest', enable_auto_commit=False, )
producer = factory.aiokafka_producer()
await consumer.start()
await producer.start()
try:
future = await producer.send("test-atol", value={"ASYNC": "start"})
#record_metadata = await future
key = None
async for msg in consumer:
print(f"async key {key} msg = {msg}")
msg.value["consumer-producer"] = True
msg.value["ASYNC"] = True
print(f"async msg = {msg}")
future = await producer.send("test-atol1", value=msg.value)
finally:
await consumer.stop()
await producer.stop()\

ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(aiostart())]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc