You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 4-6.RSVP
Socket
Book a DemoInstallSign in
Socket

simple-mongodb

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

simple-mongodb

An asynchronous Python package for streamlined collection management in MongoDB

0.9.0
pipPyPI
Maintainers
1

Simple-MongoDB

Supported-Python-Versions-Badge

Documentation-Badge Source-Code-Badge Changelog-Badge

License-Badge PyPi-Download-Badge Package-version-Badge Build-Action-Badge

Created-Badge Last-Commit-Badge

Warning: This Python package is currently still in development phase

Description

Placeholder

Installation

pip install simple-mongodb

Simple Example

import asyncio
from typing import Any

from bson import ObjectId
from pydantic import BaseModel

from simple_mongodb import BaseCollection, MongoDBClient


class AccountCollection(BaseCollection):
    db = 'my-db'  # The name of the database or set the enviroment variable MONGODB_DB
    collection = 'account-collection'  # The name of the collection


class Account(BaseModel):
    name: str


async def main() -> None:
    # Initialize a client object and pass the url or set enviroment variables
    #   MONGODB_HOST, MONGODB_PORT,
    #   MONGODB_USERNAME, MONGODB_PASSWORD
    # Is the url param or enviroment variables not set the default values are used
    client: MongoDBClient = MongoDBClient(url='mongodb://user:pass@host:27017')

    # Initialize the account collection
    account_collection: AccountCollection = AccountCollection(client=client)

    account: Account = Account(name='example-name')

    try:

        # Insert the document in the collection
        document: dict[str, Any] = account.model_dump()
        inserted_id: ObjectId = await account_collection.insert_one(document=document)

        # Find the document
        where: dict[str, Any] = {'_id': inserted_id}
        document: dict[str, Any] = await account_collection.find_one(where=where)

        # Update the document
        update: dict[str, Any] = {'$set': {'name': 'other-name'}}
        # Returns the id of the new document if upsert=True
        await account_collection.update_one(where=where, update=update, upsert=False)

    except account_collection.InsertError:
        pass
    except account_collection.FindError:
        pass
    except account_collection.UpdateError:
        pass
    except account_collection.ServerTimeoutError:
        pass

    # Close the db connection
    client.close()


if __name__ == '__main__':
    asyncio.run(main())

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts