How to insert-or-replace an entity in Cosmos DB atomically?

63 views Asked by At

In Cosmos DB, I have a container to store a bunch of product types. All product types have unique type, we can use type as a unique identity for each product type.

Here is an example of product type:

{
    "type": "type1",
    "introduction": "this is type 1",
    "size": 10
}

I have a process to insert-or-replace the product types. This process should be atomic and transactional, which means when there is no item with type = "type3" in container, and multiple process trying to insert { type": "type3", "introduction": "this is type 3" } to the container, there should only be one item with type = "type3" after all processes finished.

So I use a stored procedure to do this insert-or-replace logic:

function addOrUpdateEntity(entity) {
    var documentQuery = {
            'query': 'SELECT * FROM c where c.type = @param',
            'parameters': [{ 'name': '@param', 'value': entity.type }]
        };

    __.queryDocuments(__.getSelfLink(), documentQuery,
        function (err, items) {
            if (items.length === 0) {
                __.createDocument(__.getSelfLink(), entity, {}, function(err, item, options) {
                    if (err) { throw err; }
                })
            } else if (items.length === 1) {
                const oldEntity = items[0];
                const newEntity = Object.assign({}, oldEntity);
                newEntity.introduction = entity.introduction
                __.replaceDocument(oldEntity._self, newEntity, function (err) {
                    if (err) { throw err; }
                });
            }
        }
    );
}  

But under concurrency process running scenario, I got two items with duplicate type = "type3".

According to this document, the execution of stored procedure is transactional, then what is the reason of duplicate items all have type = "type3"?

2

There are 2 answers

1
David Makogon On

This isn't about a concurrency or transactional issue; it's about unique key constraints (or lack of unique key constraint). The only unique constraint that exists by default is with /id.

If you add /type as a unique key constraint, then this would prevent, say, multiple documents with type = "type1".

Just note that this constraint is within a partition (and you haven't mentioned your partition key).

Also, you'll need to re-create your container - pretty sure you can't add a unique key constraint post-creation.

More info here

4
Matias Quaranta On

You could use Optimistic Concurrency control.

This would be the pseudo code, the actual code depends on the language of your choice and SDK you are using:

  1. Perform a read to detect if it exists or not.
  2. If it doesn't exist, attempt the Create.
  3. If it does exist, capture the read response's ETag.
  4. Update item and pass the ETag of the read to the replace operation through the IfMatch option.
  5. If there was a concurrent update to the same document, you will get a failure with HTTP 412 status code.
  6. You can repeat the process if you need to, to read again, get the latest ETag, send the Replace again (this is assuming you need to execute the Replace on this condition).

EDIT: To clarify, you execute these steps outside of a Stored procedure, they are just operations from the client.