_Damilare
Dr. Dre & the Big Steppers

Dr. Dre & the Big Steppers

Create an E-commerce API with Django REST framework and a PlanetScale database

Create an E-commerce API with Django REST framework and a PlanetScale database

A walk-through the process of building an e-commerce API with a PlanetScale database

_Damilare's photo
_Damilare
·Jul 31, 2022·

11 min read

Table of contents

  • PlanetScale
  • Setup
  • Custom User Model
  • Store
  • Models
  • Serializers
  • Views & Viewsets
  • User Authentication
  • Conclusion

Here's what are going to build: API docs

PlanetScale

PlanetScale builds a database-as-a-service offering on Vitess, an open source sharding middleware system for MySQL powering products such as YouTube, Slack, Square and many more. Vitess helps you scale a MySQL database by allowing you to shard it. Sharding is a database architecture pattern related to horizontal partitioning where tables are split by row, usually within a single instance of a schema and a database server.

Sharding goes beyond this by partitioning tables in the same way across multiple instances of the schema. The advantage of this being that search load for the large partitioned table can now be split across multiple servers referred to as physical shards, not just indexes on the same server (logical shards), physical shards can hold multiple logical shards. This exemplifies a shared-nothing architecture.

This makes sharding useful for:

  • distributed computing.
  • reducing search effort in systems where there's an implicit way to identify which partition a particular row will be found in without having to first query the index; for instance splitting the database into tables based on the users zip code or location.
  • and mitigating the impact of outages which can make applications with large monolithic databases completely unavailable.

While vitess reduces the operational burden of managing a large fleet of MySQL instances, it comes with its own operational complexity.

PlanetScale improves upon Vitess by providing ease of use and adoption, especially when making schema changes to a running system due to the non-blocking schema change functionality and ability to automatically check for potential conflicts before a schema change is deployed. Let's get started.

Setup

This writeup assumes the reader has a basic understading of:

  • Python
  • Django
  • REST APIs

Virtual environment

In this walkthrough I’m using pipenv, but you can swap it for poetry or venv. Create and activate your virtual environment by using the following command

pipenv shell

Install the project requirements:

pipenv install django==3.2  djangorestframework==3.13.1  cloudinary==1.29.0

Start your project with the following command:

django-admin startproject config .

Add rest_framework to your list of INSTALLED_APPS

INSTALLED_APPS = [
…
#third party
‘rest_framework’,
]

Database

Start by creating a free PlanetScale account here: Sign up - PlanetScale

Then create your database:

And get your database parameters by clicking connect:

Set them as environment variables or add them to a .env file like so:

DB_NAME=<DATABASE_NAME>”
DB_USER=<DATABASE_USER>”
DB_PASSWORD=<DATABASE_PASSWORD>”
DB_HOST=<HOST>”
DB_PORT=<PORT_NUMBER>
MYSQL_ATTR_SSL_CA=<>

In settings.py configure your database with the environment variables

DATABASES = {
    "default": {
        "ENGINE": "django_psdb_engine",
        "NAME": os.environ.get("DB_NAME"),
        "HOST": os.environ.get("DB_HOST"),
        "PORT": os.environ.get("DB_PORT"),
        "USER": os.environ.get("DB_USER"),
        "PASSWORD": os.environ.get("DB_PASSWORD"),
        "OPTIONS": {"ssl": {"ca": os.environ.get("MYSQL_ATTR_SSL_CA")}},
    }
}

The next step is to bring in the PlanetScale custom database wrapper because PlanetScale doesn’t support foreignkey constraints; the wrapper is required to disable foreign key syntax in Django migrations.

git clone https://github.com/planetscale/django_psdb_engine.git

For more information on setting up PlanetScale: checkout their guide

Apps

We're going to work with two apps in this project storefront and profiles. Profiles handles our custom user model and storefront our ecommerce functionality

python manage.py startapp storefront
python manage.py startapp profiles

Add both apps to the list of INSTALLED_APPS in settings.py:

INSTALLED_APPS = [
…
#third party
‘rest_framework’,
#local apps
‘store’,
‘profiles’,
]

Add the store app to the ROOT_URLCONF

from django.contrib import admin
from django.urls import path, include
urlpatterns = [
    path("admin/", admin.site.urls),
    path("__debug__/", include("debug_toolbar.urls")),
    path("store/", include("store.urls")),]

Custom User Model

In profiles/models.py add the following:


from django.contrib.auth.models import AbstractUser
from django.db import models


class User(AbstractUser):
    email = models.EmailField(unique=True)

and include in your settings file:

AUTH_USER_MODEL = "profiles.user"

You can now run your initial migration with:

python manage.py migrate

Store

Models

In store/models.py add the following:


from django.core.validators import MinValueValidator
from django.db import models
from django.conf import settings
from django.contrib import admin
from uuid import uuid4
from cloudinary.models import CloudinaryField


class Promotion(models.Model):
    description = models.CharField(max_length=255)
    discount = models.FloatField()


class Collection(models.Model):
    title = models.CharField(max_length=255)
    featured_product = models.ForeignKey(
        "Product", on_delete=models.SET_NULL, null=True, related_name="+", blank=True
    )

    def __str__(self) -> str:
        return self.title

    class Meta:
        ordering = ["title"]


class Product(models.Model):
    title = models.CharField(max_length=255)
    slug = models.SlugField()
    description = models.TextField(null=True, blank=True)
    unit_price = models.DecimalField(
        max_digits=6, decimal_places=2, validators=[MinValueValidator(1)]
    )
    inventory = models.IntegerField(validators=[MinValueValidator(0)])
    last_update = models.DateTimeField(auto_now=True)
    collection = models.ForeignKey(
        Collection, on_delete=models.PROTECT, related_name="products"
    )

    promotions = models.ManyToManyField(Promotion, blank=True)

    def __str__(self) -> str:
        return self.title

    class Meta:
        ordering = ["title"]


class Image(models.Model):
    time_created = models.DateTimeField(auto_now_add=True)
    title = models.CharField("Title (optional)", max_length=200, blank=True)

    ## Points to a Cloudinary image
    image = CloudinaryField("image")
    product = models.OneToOneField(Product, on_delete=models.CASCADE)

    def __str__(self):
        """Informative name for model"""
        try:
            public_id = self.image.public_id
        except AttributeError:
            public_id = ""
        return "Photo <%s:%s>" % (self.title, public_id)


class Customer(models.Model):
    MEMBERSHIP_BRONZE = "B"
    MEMBERSHIP_SILVER = "S"
    MEMBERSHIP_GOLD = "G"

    MEMBERSHIP_CHOICES = [
        (MEMBERSHIP_BRONZE, "Bronze"),
        (MEMBERSHIP_SILVER, "Silver"),
        (MEMBERSHIP_GOLD, "Gold"),
    ]

    membership = models.CharField(
        max_length=1, choices=MEMBERSHIP_CHOICES, default=MEMBERSHIP_BRONZE
    )
    phone = models.CharField(blank=True, max_length=255)
    birth_date = models.DateField(null=True, blank=True)
    user = models.OneToOneField(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)

    def __str__(self):
        return f"{self.user.first_name} {self.user.last_name}"

    @admin.display(ordering="user__first_name")
    def first_name(self):
        return self.user.first_name

    @admin.display(ordering="user__last_name")
    def last_name(self):
        return self.user.last_name

    class Meta:
        ordering = ["user__first_name", "user__last_name"]


class Order(models.Model):
    PAYMENT_STATUS_PENDING = "P"
    PAYMENT_STATUS_COMPLETE = "C"
    PAYMENT_STATUS_FAILED = "F"
    PAYMENT_STATUS_CHOICES = [
        (PAYMENT_STATUS_PENDING, "Pending"),
        (PAYMENT_STATUS_COMPLETE, "Complete"),
        (PAYMENT_STATUS_FAILED, "Failed"),
    ]

    placed_at = models.DateTimeField(auto_now_add=True)
    payment_status = models.CharField(
        max_length=1, choices=PAYMENT_STATUS_CHOICES, default=PAYMENT_STATUS_PENDING
    )
    customer = models.ForeignKey(Customer, on_delete=models.PROTECT)

    class Meta:
        permissions = [("cancel_order", "Can cancel order")]


class OrderItem(models.Model):
    order = models.ForeignKey(Order, on_delete=models.PROTECT, related_name="items")
    product = models.ForeignKey(
        Product, on_delete=models.PROTECT, related_name="orderitems"
    )
    quantity = models.PositiveSmallIntegerField()
    unit_price = models.DecimalField(max_digits=6, decimal_places=2)


class Address(models.Model):
    street = models.CharField(max_length=255)
    city = models.CharField(max_length=255)
    customer = models.ForeignKey(Customer, on_delete=models.CASCADE)


class Cart(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid4)
    created_at = models.DateTimeField(auto_now_add=True)


class CartItem(models.Model):
    cart = models.ForeignKey(Cart, on_delete=models.CASCADE, related_name="items")
    product = models.ForeignKey(Product, on_delete=models.CASCADE)
    quantity = models.PositiveSmallIntegerField(validators=[MinValueValidator(1)])

    class Meta:
        unique_together = [["cart", "product"]]


class Review(models.Model):
    body = models.TextField()
    name = models.CharField(max_length=56)
    product = models.ForeignKey(
        Product, on_delete=models.CASCADE, related_name="reviews"
    )
    date = models.DateField(auto_now_add=True)

In the code above we created the database models for our app and included cloudinary to handle image uploads.

Next create a signal handler so a customer profile is automatically created for every new user,start by creating a signals.py file in your store app and add the code below to it

from django.db.models.signals import post_save
from django.dispatch import receiver
from django.conf import settings
from .models import Customer


@receiver(post_save, sender=settings.AUTH_USER_MODEL)
def create_customer_for_new_user(sender, **kwargs):
    if kwargs["created"]:
        Customer.objects.create(user=kwargs["instance"])

Then modify the ready method in the StoreConfig class in apps.py:

class StoreConfig(AppConfig):
    default_auto_field = "django.db.models.BigAutoField"
    name = "store"

    def ready(self) -> None:
        from . import signals

Now create and run your second set of migrations:

python manage.py makemigrations store
python manage.py migrate

Serializers


from typing import Dict
from decimal import Decimal
from django.db import transaction
from rest_framework import serializers
from .models import (
    Cart,
    CartItem,
    Customer,
    Image,
    Order,
    OrderItem,
    Product,
    Collection,
    Review,
)


class CollectionSerializer(serializers.ModelSerializer):
    class Meta:
        model = Collection
        fields = ("id", "title", "product_count")

    product_count = serializers.IntegerField(read_only=True)


class ImageSerializer(serializers.ModelSerializer):
    image = serializers.ImageField()

    class Meta:
        model = Image
        fields = ("title", "image", "product")


class ProductSerialzer(serializers.ModelSerializer):
    image = ImageSerializer()

    class Meta:
        model = Product
        fields = (
            "id",
            "title",
            "description",
            "slug",
            "inventory",
            "unit_price",
            "price_with_tax",
            "collection",
            "image",
        )

    price_with_tax = serializers.SerializerMethodField("calculate_tax")

    def calculate_tax(self, product: Product) -> Decimal:
        return product.unit_price * Decimal(1.1)


class ReviewSerializer(serializers.ModelSerializer):
    class Meta:
        model = Review
        fields = ("id", "date", "name", "body", "date")

    def create(self, validated_data: Dict):
        product_id = self.context.get("product_id")
        return Review.objects.create(product_id=product_id, **validated_data)


class BasicProductSerializer(serializers.ModelSerializer):
    class Meta:
        model = Product
        fields = ("id", "title", "unit_price")


class CartItemSerializer(serializers.ModelSerializer):
    product = BasicProductSerializer()
    total_price = serializers.SerializerMethodField()

    def get_total_price(self, cartitem: CartItem) -> int:
        return cartitem.quantity * cartitem.product.unit_price

    class Meta:
        model = CartItem
        fields = ("id", "product", "quantity", "total_price")


class CartSerializer(serializers.ModelSerializer):
    id = serializers.UUIDField(read_only=True)
    items = CartItemSerializer(many=True, read_only=True)
    total_price = serializers.SerializerMethodField()

    def get_total_price(self, cart: Cart) -> Decimal:
        return sum(item.quantity * item.product.unit_price for item in cart.items.all())

    class Meta:
        model = Cart
        fields = ("id", "items", "total_price")


class AddCartItemSerializer(serializers.ModelSerializer):
    product_id = serializers.IntegerField()

    def validate_product_id(self, value: int):
        if not Product.objects.filter(pk=value):
            raise serializers.ValidationError("No product with the given ID exists")
        return value

    def save(self, **kwargs):
        quantity = self.validated_data.get("quantity")
        product_id = self.validated_data.get("product_id")
        cart_id = self.context.get("cart_id")
        try:
            cart_item = CartItem.objects.get(cart_id=cart_id, product_id=product_id)
            cart_item.quantity += quantity
            cart_item.save()
            self.instance = cart_item
        except CartItem.DoesNotExist:
            self.instance = CartItem.objects.create(
                cart_id=cart_id, **self.validated_data
            )
        return self.instance

    class Meta:
        model = CartItem
        fields = ("id", "product_id", "quantity")


class UpdateCartItemSerializer(serializers.ModelSerializer):
    class Meta:
        model = CartItem
        fields = ("quantity",)


class CustomerSerializer(serializers.ModelSerializer):
    user_id = serializers.IntegerField(read_only=True)

    class Meta:
        model = Customer
        fields = ("id", "user_id", "phone", "birth_date", "membership")


class OrderItemSerializer(serializers.ModelSerializer):
    product = BasicProductSerializer()

    class Meta:
        model = OrderItem
        fields = ("id", "product", "unit_price", "quantity")


class OrderSerializer(serializers.ModelSerializer):
    items = OrderItemSerializer(many=True)

    class Meta:
        model = Order
        fields = ("id", "customer", "placed_at", "payment_status", "items")


class UpdateOrderSerializer(serializers.ModelSerializer):
    class Meta:
        model = Order
        fields = ("payment_status",)


class CreateOrderSerializer(serializers.Serializer):
    cart_id = serializers.UUIDField()

    def validate_cart_id(self, cart_id):
        if not Cart.objects.filter(pk=cart_id).exists():
            raise serializers.ValidationError("No cart with the given ID was found.")
        if CartItem.objects.filter(cart_id=cart_id).count() == 0:
            raise serializers.ValidationError("The cart is empty.")
        return cart_id

    def save(self, **kwargs):
        with transaction.atomic():
            cart_id = self.validated_data.get("cart_id")

            customer = Customer.objects.get(user_id=self.context.get("user_id"))
            order = Order.objects.create(customer=customer)

            cart_items = CartItem.objects.select_related("product").filter(
                cart_id=cart_id
            )
            order_items = [
                OrderItem(
                    order=order,
                    product=item.product,
                    unit_price=item.product.unit_price,
                    quantity=item.quantity,
                )
                for item in cart_items
            ]
            OrderItem.objects.bulk_create(order_items)

            return order

In the code above we defined all the serialzers we need to parse, validate and convert data into serializable format.

Views & Viewsets


from django.db.models.aggregates import Count
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.decorators import action
from rest_framework import status
from rest_framework.permissions import IsAuthenticated, AllowAny, IsAdminUser
from rest_framework.filters import SearchFilter, OrderingFilter
from rest_framework.response import Response
from rest_framework.generics import GenericAPIView
from rest_framework.viewsets import ModelViewSet, GenericViewSet
from rest_framework.mixins import (
    CreateModelMixin,
    RetrieveModelMixin,
    DestroyModelMixin,
)
import cloudinary
from .perimissions import IsAdminOrReadOnly

from .filters import ProductFilter
from .serializers import (
    AddCartItemSerializer,
    CartItemSerializer,
    CartSerializer,
    CollectionSerializer,
    CreateOrderSerializer,
    CustomerSerializer,
    ImageSerializer,
    OrderSerializer,
    ProductSerialzer,
    ReviewSerializer,
    UpdateCartItemSerializer,
    UpdateOrderSerializer,
)
from .models import (
    CartItem,
    Collection,
    Customer,
    Order,
    Product,
    OrderItem,
    Review,
    Cart,
)
from .pagination import DefaultProductPagination


class ProductViewSet(ModelViewSet):

    serializer_class = ProductSerialzer
    filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
    filterset_class = ProductFilter
    search_fields = ["title", "description"]
    pagination_class = DefaultProductPagination
    permission_classes = (IsAdminOrReadOnly,)

    def get_queryset(self):
        queryset = Product.objects.all()
        collection_id = self.request.query_params.get("collection_id")
        if collection_id:
            queryset = queryset.filter(
                collection_id=self.request.query_params.get("collection_id")
            )
        return queryset

    def get_serializer_context(self):
        return {"request": self.request}

    def destroy(self, request, *args, **kwargs):

        if OrderItem.objects.filter(product_id=kwargs.get("pk")):
            return Response(
                {
                    "error": "Product cannot be deleted because there are orders associated with it"
                }
            )
        return super().destroy(request, *args, **kwargs)


class CollectionViewSet(ModelViewSet):

    queryset = Collection.objects.annotate(product_count=Count("products")).all()
    serializer_class = CollectionSerializer
    permission_classes = (IsAdminOrReadOnly,)

    def destroy(self, request, *args, **kwargs):
        if Product.objects.filter(collection_id=kwargs.get("pk")):
            return Response(
                {
                    "error": "Collection cannot be deleted because it contains one or more products"
                },
                status=status.HTTP_409_CONFLICT,
            )
        return super().destroy(request, *args, **kwargs)


class ReviewViewSet(ModelViewSet):

    serializer_class = ReviewSerializer

    def get_queryset(self):
        return Review.objects.filter(product_id=self.kwargs.get("product_pk"))

    def get_serializer_context(self):
        return {"product_id": self.kwargs.get("product_pk")}

    def create(self, request, *args, **kwargs):
        product_id = self.kwargs.get("product_pk")
        try:
            Product.objects.get(id=product_id)
        except Product.DoesNotExist:
            return Response(
                {"error": "Cannot create a review for a product that doesn't exist"}
            )
        return super().perform_create(request, *args, **kwargs)


class CartViewSet(
    GenericViewSet, CreateModelMixin, RetrieveModelMixin, DestroyModelMixin
):
    queryset = Cart.objects.prefetch_related("items__product").all()
    serializer_class = CartSerializer


class CartItemViewSet(ModelViewSet):
    http_method_names = ["get", "post", "patch", "delete"]

    def get_serializer_class(self):
        serializer_dict = {
            "POST": AddCartItemSerializer,
            "PATCH": UpdateCartItemSerializer,
            "DEFAULT": CartItemSerializer,
        }
        return serializer_dict.get(self.request.method, serializer_dict["DEFAULT"])

    def get_queryset(self):
        return CartItem.objects.filter(cart_id=self.kwargs["cart_pk"]).select_related(
            "product"
        )

    def get_serializer_context(self):
        return {"cart_id": self.kwargs.get("cart_pk")}


class CustomerViewSet(ModelViewSet):
    queryset = Customer.objects.all()
    serializer_class = CustomerSerializer
    permission_classes = (IsAdminUser,)

    @action(detail=False, methods=["GET", "PUT"], permission_classes=(IsAuthenticated,))
    def me(self, request):
        customer = Customer.objects.get(user_id=request.user.id)
        if request.method == "GET":
            serializer = CustomerSerializer(customer)
            return Response(serializer.data)
        elif request.method == "POST":
            serializer = CustomerSerializer(customer, data=request.data)
            serializer.is_valid(raise_exception=True)
            serializer.save()
            return Response(serializer.data)


class OrderViewSet(ModelViewSet):
    http_method_names = ["get", "post", "patch", "delete", "head", "options"]

    def get_permissions(self):
        if self.request.method in ["PATCH", "DELETE"]:
            return (IsAdminUser(),)
        return (IsAuthenticated(),)

    def get_queryset(self):
        user = self.request.user
        if user.is_staff:
            return Order.objects.all()
        customer_id = Customer.objects.only("id").get(user_id=user.id)
        return Order.objects.filter(customer_id=customer_id)

    def create(self, request, *args, **kwargs):
        serializer = CreateOrderSerializer(data=self.request.data)
        serializer.is_valid(raise_exception=True)
        order = serializer.save()
        serializer = OrderSerializer(order)
        return Response(serializer.data)

    def get_serializer_context(self):
        return {"user_id": self.request.user.id}

    def get_serializer_class(self):

        if self.request.method == "POST":
            return CreateOrderSerializer
        if self.request.method == "PATCH":
            return UpdateOrderSerializer
        return OrderSerializer


class ImageUploadView(CreateModelMixin, GenericAPIView):

    serializer_class = ImageSerializer

    def post(self, request):
        image = request.data.get("image")
        serializer = ImageSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        serializer.save()

        cloudinary.uploader.upload(image)
        return Response(serializer.data)

Create the associated pagination, filter, and permission classes: In pagination.py add the following

from rest_framework.pagination import PageNumberPagination


class DefaultProductPagination(PageNumberPagination):
    page_size = 10

The filtering functionality depends on django-filter install it in your virtual environment and add django_filters to your INSTALLED_APPS. Then include the following code in filters.py

from django_filters.rest_framework import FilterSet
from .models import Product


class ProductFilter(FilterSet):
    class Meta:
        model = Product
        fields = {"collection_id": ["exact"], "unit_price": ["gt", "lt"]}

In permissions.py create the custom permissions class

from rest_framework import permissions

class IsAdminOrReadOnly(permissions.BasePermission):
    def has_permission(self, request, view):
        if request.method in permissions.SAFE_METHODS:
            return True
        return bool(request.user and request.user.is_staff)

Create a urls.py file for the routers and add the following:

from django.urls import path
from . import views
from rest_framework_nested import routers

router = routers.DefaultRouter()
router.register("products", views.ProductViewSet, basename="products")
router.register("collections", views.CollectionViewSet)
router.register("carts", views.CartViewSet)
router.register("customers", views.CustomerViewSet)
router.register("orders", views.OrderViewSet, basename="orders")

cart_router = routers.NestedDefaultRouter(router, "carts", lookup="cart")
cart_router.register("items", views.CartItemViewSet, basename="cart-items")

product_router = routers.NestedDefaultRouter(router, "products", lookup="product")
product_router.register("reviews", views.ReviewViewSet, basename="product-reviews")
image_upload_router = [path("images", views.ImageUploadView.as_view())]
urlpatterns = router.urls + product_router.urls + cart_router.urls + image_upload_router

User Authentication

To set up JSON Web Token authentication were going to use djoser and djangorestframework-simplejwt which djoser depends on for JWT. start by installing them:

pipenv install djoser djangorestframework-simplejwt

Add djoser to your installed apps and wire it up to your urls:

settings.py

INSTALLED_APPS=[
    ...
    "djoser",
]

urls.py

urlpatterns=[
    ...
    path("auth/", include("djoser.urls")),
    path("auth/", include("djoser.urls.jwt")),
]

Finally modify the djoser serializers to include first and last name fields in registration. In the profiles app create a new file serialzers.py with this code:

from djoser.serializers import (
    UserSerializer,
    UserCreateSerializer as BaseUserCreateSerializer,
)


class UserCreateSerializer(BaseUserCreateSerializer):
    class Meta(BaseUserCreateSerializer.Meta):
        fields = (
            "id",
            "username",
            "password",
            "email",
            "first_name",
            "last_name",
        )


class UserDetailSerializer(UserSerializer):
    class Meta(UserSerializer.Meta):
        fields = ("id", "email", "username", "first_name", "last_name")

And modify djoser's settings to use your newly defined serializers

DJOSER = {
    "SERIALIZERS": {
        "user_create": "profiles.serializers.UserCreateSerializer",
        "current_user": "profiles.serializers.UserDetailSerializer",
    }
}

Conclusion

In this article we saw how easy it is to set up a database with PlanetScale in no time, and created a fully functioning django application with the database.

The complete project can be found on my GitHub. Here's a sample frontend built with the API: valdivia

Thank you or reading up to this point, I'll catch you guys on the next one👋.

Did you find this article valuable?

Support _Damilare by becoming a sponsor. Any amount is appreciated!

Learn more about Hashnode Sponsors
 
Share this