10. Mongo

Nemesyst MongoDB abstraction/ Handler. This handler helps abstract some pymongo functionality to make it easier for us to use a MongoDB database for our deep learning purposes.

10.1. Example usage

Below follows a in code example unit test for all functionality. You can override the options using a dictionary to the constructor or as keyword arguments to the functions that use them:

def _mongo_unit_test():
    """Unit test of MongoDB compat."""
    import datetime
    import pickle
    # create Mongo object to use
    db = Mongo({"test2": 2, "db_port": "65535"})
    # testing magic functions
    db["test2"] = 3  # set item
    db["test2"]  # get item
    len(db)  # len
    del db["test2"]  # del item
    # output current state of Mongo
    db.debug()
    # stop any active databases already running at the db path location
    db.stop()
    # hold for 2 seconds to give the db time to start
    time.sleep(2)
    # attempt to initialise the database, as in create the database with users
    db.init()
    # hold to let the db to launch the now new unauthenticated db
    time.sleep(2)
    # start the authenticated db, you will now need a username password access
    db.start()
    # warm up time for new authentication db
    time.sleep(2)
    # create a connection to the database so we can do database operations
    db.connect()
    db.debug()
    # import data into mongodb debug collection
    db.dump(db_collection_name="test", data={
        "string": "99",
        "number": 99,
        "binary": bin(99),
        "subdict": {"hello": "world"},
        "subarray": [{"hello": "worlds"}, {"hi": "jim"}],
        "timedate": datetime.datetime.utcnow(),
    })
    # testing gridfs insert item into database
    db.dump(db_collection_name="test", data=(
        {"utctime": datetime.datetime.utcnow()},
        b"some_test_string"
        # pickle.dumps("some_test_string")
    ))
    # log into the database so user can manually check data import
    db.login()
    # attempt to retrieve the data that exists in the collection as a cursor
    c = db.getCursor(db_collection_name="test", db_pipeline=[{"$match": {}}])
    # itetate through the data in batches to minimise requests
    for dataBatch in db.getBatches(db_batch_size=32, db_data_cursor=c):
        print("Returned number of documents:", len(dataBatch))
    # define a pipeline to get the latest gridfs file in a given collection
    fs_pipeline = [{'$sort': {'uploadDate': -1}},
                   {'$limit': 5},
                   {'$project': {'_id': 1}}]
    # get a cursor to get us the ID of files we desire
    fc = db.getCursor(db_collection_name="test.files", db_pipeline=fs_pipeline)
    # use cursor and get files to collect our data in batches
    for batch in db.getFiles(db_batch_size=2, db_data_cursor=fc):
        for doc in batch:
            # now read the gridout object
            print(doc["gridout"].read())
    # finally close out database
    db.stop()

This unit test also briefly shows how to use gridfs by dumping tuple items in the form (dict(), object), where the dict will become the files metadata and the object is some form of the data that can be sequentialized into the database.

Warning

Mongo uses subprocess.Popen in init, start, and stop, since these threads would otherwise lock up nemesyst, with time.sleep() to wait for the database to startup, and shutdown. Depending on the size of your database it may be necessary to extend the length of time time.sleep() as larger databases will take longer to startup and shutdown.

10.2. API

class mongo.Mongo(args: dict = None, logger: print = None)

Python2/3 compatible MongoDb utility wrapper.

This wrapper saves its state in an internal overridable dictionary such that you can adapt it to your requirements, if you should need to do something unique, the caveat being it becomes harder to read.

Parameters
  • args (dictionary) – Dictionary of overides.

  • logger (function address) – Function address to print/ log to (default: print).

Example

Mongo({“db_user_name”: “someUsername”, “db_password”: “somePassword”})

Example

Mongo()

connect(db_ip: str = None, db_port: str = None, db_authentication: str = None, db_authentication_database=None, db_user_name: str = None, db_password: str = None, db_name: str = None, db_replica_set_name: str = None, db_replica_read_preference: str = None, db_replica_max_staleness: str = None, db_tls: bool = None, db_tls_ca_file: str = None, db_tls_certificate_key_file: str = None, db_tls_certificate_key_file_password: str = None, db_tls_crl_file: str = None, db_collection_name: str = None) → pymongo.database.Database

Connect to a specific mongodb database.

This sets the internal db client which is neccessary to connect to and use the associated database. Without it operations such as dump into the database will fail. This is replica set capable.

Parameters
  • db_ip (string) – Database hostname or ip to connect to.

  • db_port (string) – Database port to connect to.

  • db_authentication (string) – The authentication method to use on db.

  • db_user_name (string) – Username to use for authentication to db_name.

  • db_password (string) – Password for db_user_name in database db_name.

  • db_name (string) – The name of the database to connect to.

  • db_replica_set_name (string) – Name of the replica set to connect to.

  • db_replica_read_preference (string) – What rep type to prefer reads from.

  • db_replica_max_staleness (string) – Max seconds behind is replica allowed.

  • db_tls (bool) – use TLS for db connection.

  • db_tls_certificate_key_file (string) – Certificate and key file for tls.

  • db_tls_certificate_key_file_password (string) – Cert and key file pass.

  • db_tls_crl_file (string) – Certificate revocation list file path.

  • db_collection_name (string) – GridFS collection to use.

Returns

database client object

Return type

pymongo.database.Database

debug() → None

Log function to help track the internal state of the class.

Simply logs working state of args dict.

dump(db_collection_name: str, data: dict, db: pymongo.database.Database = None) → None

Import data dictionary into database.

Parameters
  • db_collection_name (string) – Collection name to import into.

  • data (dictionary) – Data to import into database.

  • db (pymongo.database.Database) – Database to import data into.

Example

dump(db_collection_name=”test”, data={“subdict”:{“hello”: “world”}})

getBatches(db_batch_size: int = None, db_data_cursor: pymongo.command_cursor.CommandCursor = None) → list

Get database cursor data in batches.

Parameters
  • db_batch_size (integer) – The number of items to return in a single round.

  • db_data_cursor (command_cursor.CommandCursor) – The cursor to use to retrieve data from db.

Returns

yields a list of items requested.

Return type

list of dicts

Todo

desperateley needs a rewrite and correction of bug. Last value always fails. I want this in a magic function too to make it easy.

getCursor(db: pymongo.database.Database = None, db_pipeline: list = None, db_collection_name: str = None) → pymongo.command_cursor.CommandCursor

Use aggregate pipeline to get a data-cursor from the database.

This cursor is what mongodb provides to allow you to request the data from the database in a manner you control, instead of just getting a big dump from the database.

Parameters
  • db_pipeline (list of dicts) – Mongodb aggregate pipeline data to transform and retrieve the data as you request.

  • db_collection_name (str) – The collection name which we will pull data from using the aggregate pipeline.

  • db (pymongo.database.Database) – Database object to operate pipeline on.

Returns

Command cursor to fetch the data with.

Return type

pymongo.command_cursor.CommandCursor

getFiles(db_batch_size: int = None, db_data_cursor: pymongo.command_cursor.CommandCursor = None, db_collection_name: str = None, db: pymongo.database.Database = None) → list

Get gridfs files from mongodb by id using cursor to .files.

Parameters
  • db_batch_size (integer) – The number of items to return in a single round.

  • db_data_cursor (command_cursor.CommandCursor) – The cursor to use to retrieve data from db.

  • db_collection_name (str) – The top level collecton name not including .chunks or .files where gridfs is to operate.

  • db (pymongo.database.Database) – Database object to operate pipeline on.

Returns

yields a list of tuples containing (item requested, metadata).

init(db_path: str = None, db_log_path: str = None, db_log_name: str = None, db_config_path: str = None) → None

Initialise the database.

Includes ensuring db path and db log path exist and generating, creating the DB files, and adding an authentication user. All of this should be done on a localhost port so that the unprotected database is never exposed.

Parameters
  • db_path (string) – Desired directory of MongoDB database files.

  • db_log_path (string) – Desired directory of MongoDB log files.

  • db_log_name (string) – Desired name of log file.

  • db_config_path (string) – Config file to pass to MongoDB.

login(db_port: str = None, db_user_name: str = None, db_password: str = None, db_name: str = None, db_ip: str = None) → None

Log in to database, interrupt, and availiable via cli.

Parameters
  • db_port (string) – Database port to connect to.

  • db_user_name (string) – Database user to authenticate as.

  • db_password (string) – User password to authenticate with.

  • db_name (string) – Database to authenticate to, the authentication db.

  • db_ip (string) – Database ip to connect to.

start(db_ip: str = None, db_port: str = None, db_path: str = None, db_log_path: str = None, db_log_name: str = None, db_cursor_timeout: int = None, db_config_path: str = None, db_replica_set_name: str = None) → subprocess.Popen

Launch an on machine database with authentication.

Parameters
  • db_ip (list) – List of IPs to accept connectiongs from.

  • db_port (string) – Port desired for database.

  • db_path (string) – Path to parent dir of database.

  • db_log_path (string) – Path to parent dir of log files.

  • db_log_name (string) – Desired base name for log files.

  • db_cursor_timeout (integer) – Set timeout time for unused cursors.

  • db_path – Config file path to pass to MongoDB.

Return type

subprocess.Popen

Returns

Subprocess of running MongoDB.

stop(db_path=None) → subprocess.Popen

Stop a running local database.

Parameters

db_path (string) – The path to the database to shut down.

Returns

Subprocess of database closer.

Return type

subprocess.Popen