AWS Lambda Serverless Development: Complete 2026 Guide
in Development on AWS, Lambda, Serverless, Cloud
AWS Lambda Serverless Development: Complete 2026 Guide
AWS Lambda continues to dominate serverless computing in 2026. This guide covers everything from basics to advanced patterns for building production-grade serverless applications.
Why AWS Lambda?
- No server management - Focus on code, not infrastructure
- Automatic scaling - From zero to thousands of concurrent executions
- Pay per use - Only pay for compute time consumed
- Event-driven - Integrate with 200+ AWS services
- Multiple runtimes - Python, Node.js, Java, Go, Rust, and custom runtimes
Lambda Fundamentals
Basic Function Structure
# handler.py
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
"""
Lambda handler function
Args:
event: Event data (API Gateway, S3, SQS, etc.)
context: Runtime information
Returns:
Response object
"""
logger.info(f"Event: {json.dumps(event)}")
try:
# Your business logic here
result = process_event(event)
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'message': 'Success',
'data': result
})
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {
'statusCode': 400,
'body': json.dumps({'error': str(e)})
}
except Exception as e:
logger.error(f"Internal error: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
def process_event(event):
# Business logic
body = json.loads(event.get('body', '{}'))
return {'processed': True, 'input': body}
Node.js Handler
// handler.js
export const handler = async (event, context) => {
console.log('Event:', JSON.stringify(event, null, 2));
try {
const body = JSON.parse(event.body || '{}');
const result = await processRequest(body);
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
body: JSON.stringify({
message: 'Success',
data: result,
}),
};
} catch (error) {
console.error('Error:', error);
return {
statusCode: error.statusCode || 500,
body: JSON.stringify({
error: error.message || 'Internal server error',
}),
};
}
};
async function processRequest(body) {
// Business logic
return { processed: true, timestamp: new Date().toISOString() };
}
Deployment with SAM
SAM Template
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless API Application
Globals:
Function:
Timeout: 30
MemorySize: 256
Runtime: python3.12
Architectures:
- arm64
Environment:
Variables:
STAGE: !Ref Stage
LOG_LEVEL: INFO
Parameters:
Stage:
Type: String
Default: dev
AllowedValues:
- dev
- staging
- prod
Resources:
# API Gateway
ApiGateway:
Type: AWS::Serverless::Api
Properties:
StageName: !Ref Stage
Cors:
AllowMethods: "'GET,POST,PUT,DELETE,OPTIONS'"
AllowHeaders: "'Content-Type,Authorization'"
AllowOrigin: "'*'"
Auth:
DefaultAuthorizer: CognitoAuthorizer
Authorizers:
CognitoAuthorizer:
UserPoolArn: !GetAtt UserPool.Arn
# Lambda Functions
GetItemsFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers/items.get_items
Description: Get all items
Events:
Api:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /items
Method: GET
Policies:
- DynamoDBReadPolicy:
TableName: !Ref ItemsTable
CreateItemFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers/items.create_item
Description: Create new item
Events:
Api:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /items
Method: POST
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref ItemsTable
ProcessItemFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers/processor.process
Description: Process items from SQS
Events:
SQSEvent:
Type: SQS
Properties:
Queue: !GetAtt ProcessingQueue.Arn
BatchSize: 10
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref ItemsTable
# DynamoDB Table
ItemsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub ${AWS::StackName}-items
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
- AttributeName: gsi1pk
AttributeType: S
- AttributeName: gsi1sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: GSI1
KeySchema:
- AttributeName: gsi1pk
KeyType: HASH
- AttributeName: gsi1sk
KeyType: RANGE
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
# SQS Queue
ProcessingQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${AWS::StackName}-processing
VisibilityTimeoutSeconds: 180
RedrivePolicy:
deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
maxReceiveCount: 3
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${AWS::StackName}-dlq
# Cognito
UserPool:
Type: AWS::Cognito::UserPool
Properties:
UserPoolName: !Sub ${AWS::StackName}-users
AutoVerifiedAttributes:
- email
Policies:
PasswordPolicy:
MinimumLength: 8
RequireUppercase: true
RequireLowercase: true
RequireNumbers: true
Outputs:
ApiEndpoint:
Description: API Gateway endpoint URL
Value: !Sub https://${ApiGateway}.execute-api.${AWS::Region}.amazonaws.com/${Stage}
ItemsTableName:
Description: DynamoDB table name
Value: !Ref ItemsTable
SAM Commands
# Build
sam build
# Local testing
sam local start-api
sam local invoke GetItemsFunction -e events/get-items.json
# Deploy
sam deploy --guided
# Sync for development
sam sync --watch --stack-name my-app-dev
Lambda Layers
Creating a Layer
# Create layer directory
mkdir -p layers/common/python
# Install dependencies
pip install requests boto3 -t layers/common/python/
# In template.yaml
CommonLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: common-dependencies
ContentUri: layers/common/
CompatibleRuntimes:
- python3.12
RetentionPolicy: Delete
Metadata:
BuildMethod: python3.12
Using Layers
GetItemsFunction:
Type: AWS::Serverless::Function
Properties:
# ...
Layers:
- !Ref CommonLayer
- arn:aws:lambda:us-east-1:123456789012:layer:aws-sdk-pandas:1
Advanced Patterns
1. Powertools for AWS Lambda
# requirements.txt
aws-lambda-powertools[all]
# handler.py
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.validation import validate
logger = Logger()
tracer = Tracer()
metrics = Metrics()
app = APIGatewayRestResolver()
@app.get("/items")
@tracer.capture_method
def get_items():
items = fetch_items()
metrics.add_metric(name="ItemsRetrieved", unit="Count", value=len(items))
return {"items": items}
@app.post("/items")
@tracer.capture_method
def create_item():
item = app.current_event.json_body
result = save_item(item)
return {"id": result["id"]}, 201
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event: dict, context: LambdaContext):
return app.resolve(event, context)
2. Event-Driven Architecture
# DynamoDB Stream processor
def process_stream(event, context):
for record in event['Records']:
if record['eventName'] == 'INSERT':
new_item = record['dynamodb']['NewImage']
# Process new item
handle_new_item(new_item)
elif record['eventName'] == 'MODIFY':
old_item = record['dynamodb']['OldImage']
new_item = record['dynamodb']['NewImage']
# Handle modification
handle_modification(old_item, new_item)
elif record['eventName'] == 'REMOVE':
old_item = record['dynamodb']['OldImage']
# Handle deletion
handle_deletion(old_item)
3. Step Functions Integration
# State machine definition
OrderProcessingStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
DefinitionUri: statemachine/order-processing.asl.json
DefinitionSubstitutions:
ValidateOrderFunctionArn: !GetAtt ValidateOrderFunction.Arn
ProcessPaymentFunctionArn: !GetAtt ProcessPaymentFunction.Arn
FulfillOrderFunctionArn: !GetAtt FulfillOrderFunction.Arn
Policies:
- LambdaInvokePolicy:
FunctionName: !Ref ValidateOrderFunction
- LambdaInvokePolicy:
FunctionName: !Ref ProcessPaymentFunction
- LambdaInvokePolicy:
FunctionName: !Ref FulfillOrderFunction
// statemachine/order-processing.asl.json
{
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "${ValidateOrderFunctionArn}",
"Next": "ProcessPayment",
"Catch": [{
"ErrorEquals": ["ValidationError"],
"Next": "OrderFailed"
}]
},
"ProcessPayment": {
"Type": "Task",
"Resource": "${ProcessPaymentFunctionArn}",
"Next": "FulfillOrder",
"Retry": [{
"ErrorEquals": ["PaymentRetryableError"],
"MaxAttempts": 3,
"BackoffRate": 2
}]
},
"FulfillOrder": {
"Type": "Task",
"Resource": "${FulfillOrderFunctionArn}",
"End": true
},
"OrderFailed": {
"Type": "Fail",
"Error": "OrderProcessingFailed"
}
}
}
Performance Optimization
1. Cold Start Optimization
# Initialize outside handler (runs during cold start)
import boto3
# These are reused across invocations
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
def handler(event, context):
# Handler uses pre-initialized resources
response = table.get_item(Key={'pk': event['id']})
return response.get('Item')
2. Provisioned Concurrency
GetItemsFunction:
Type: AWS::Serverless::Function
Properties:
# ...
AutoPublishAlias: live
ProvisionedConcurrencyConfig:
ProvisionedConcurrentExecutions: 10
3. SnapStart (Java)
JavaFunction:
Type: AWS::Serverless::Function
Properties:
Runtime: java21
SnapStart:
ApplyOn: PublishedVersions
AutoPublishAlias: live
Monitoring and Observability
CloudWatch Insights
# Query Lambda errors
fields @timestamp, @message
| filter @message like /ERROR/
| sort @timestamp desc
| limit 100
# Cold start analysis
filter @type = "REPORT"
| stats count(*) as invocations,
pct(@duration, 50) as p50,
pct(@duration, 99) as p99,
pct(@initDuration, 50) as cold_start_p50
by bin(1h)
X-Ray Tracing
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all() # Patch boto3, requests, etc.
@xray_recorder.capture('process_data')
def process_data(data):
# Traced function
return transform(data)
Security Best Practices
1. Least Privilege IAM
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:Query
Resource:
- !GetAtt ItemsTable.Arn
- !Sub ${ItemsTable.Arn}/index/*
2. Secrets Management
import boto3
from functools import lru_cache
secrets_client = boto3.client('secretsmanager')
@lru_cache(maxsize=1)
def get_secret(secret_name):
response = secrets_client.get_secret_value(SecretId=secret_name)
return response['SecretString']
def handler(event, context):
api_key = get_secret(os.environ['API_KEY_SECRET_ARN'])
# Use api_key
3. VPC Configuration
GetItemsFunction:
Type: AWS::Serverless::Function
Properties:
VpcConfig:
SecurityGroupIds:
- !Ref LambdaSecurityGroup
SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
Testing
Unit Tests
# test_handler.py
import pytest
from unittest.mock import patch, MagicMock
from handlers.items import get_items
@pytest.fixture
def api_gateway_event():
return {
'httpMethod': 'GET',
'path': '/items',
'queryStringParameters': {'limit': '10'}
}
@patch('handlers.items.table')
def test_get_items(mock_table, api_gateway_event):
mock_table.scan.return_value = {
'Items': [{'id': '1', 'name': 'Test'}]
}
result = get_items(api_gateway_event, None)
assert result['statusCode'] == 200
assert 'Test' in result['body']
Integration Tests
import requests
BASE_URL = "https://api-id.execute-api.us-east-1.amazonaws.com/prod"
def test_create_and_get_item():
# Create
create_response = requests.post(
f"{BASE_URL}/items",
json={"name": "Test Item"}
)
assert create_response.status_code == 201
item_id = create_response.json()['id']
# Get
get_response = requests.get(f"{BASE_URL}/items/{item_id}")
assert get_response.status_code == 200
assert get_response.json()['name'] == "Test Item"
Conclusion
AWS Lambda enables building scalable, cost-effective serverless applications. By following these patterns and best practices, you can create production-ready serverless architectures that scale automatically and minimize operational overhead.
Key takeaways:
- Use AWS SAM for infrastructure as code
- Leverage Lambda Powertools for observability
- Optimize for cold starts with best practices
- Implement proper error handling and retries
- Follow security best practices
Resources
이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)
