Skip to content

Commit

Permalink
✨(backends) add graylog service
Browse files Browse the repository at this point in the history
LDP backends is available for tests.But until now, no service was
available in the project. We have implemented a graylog service for it.
  • Loading branch information
quitterie-lcs committed Nov 30, 2021
1 parent f42e684 commit 8d20434
Show file tree
Hide file tree
Showing 16 changed files with 580 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ RALPH_APP_DIR=/app/.ralph
# Uncomment lines (by removing # characters at the beginning of target lines)
# to define environment variables associated to the backend(s) you need.

# Graylog storage backend

# RALPH_GRAYLOG_HOST=graylog
# RALPH_GRAYLOG_PORT=12201

# LDP storage backend
#
# You need to generate an API token for your OVH's account and fill the service
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to

### Added

- `graylog` logging backend
- Implement edx problem interaction events pydantic models
- Implement edx textbook interaction events pydantic models
- `ws` websocket stream backend (compatible with the `fetch` command)
Expand Down
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ run: \
run-all: ## start all supported local backends
run-all: \
run-es \
run-graylog \
run-swift
.PHONY: run-all

Expand All @@ -123,6 +124,14 @@ run-es: ## start elasticsearch backend
@$(COMPOSE_RUN) dockerize -wait tcp://elasticsearch:9200 -timeout 60s
.PHONY: run-es

run-graylog: ## start graylog backend
@$(COMPOSE) up -d graylog
@echo "Waiting for graylog to be up and running..."
@$(COMPOSE_RUN) dockerize -wait tcp://mongo:27017 -timeout 60s
@$(COMPOSE_RUN) dockerize -wait tcp://elasticsearch:9200 -timeout 60s
@$(COMPOSE_RUN) dockerize -wait tcp://graylog:9000 -timeout 60s
.PHONY: run-graylog

run-swift: ## start swift backend
@$(COMPOSE) up -d swift
@echo "Waiting for swift to be up and running..."
Expand All @@ -138,7 +147,7 @@ stop: ## stops backend servers
.PHONY: stop

test: ## run back-end tests
test: run-es
test: run-es run-graylog
bin/pytest
.PHONY: test

Expand Down
27 changes: 25 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.4'
version: "3.4"

services:
app:
Expand All @@ -15,7 +15,9 @@ services:
PYLINTHOME: /app/.pylint.d
volumes:
- .:/app

networks:
default:

# -- backends
elasticsearch:
image: elasticsearch:7.13.3
Expand All @@ -37,6 +39,27 @@ services:
environment:
KS_SWIFT_PUBLIC_URL: http://127.0.0.1:49177

mongo:
image: mongo:4.2
networks:
default:

graylog:
image: graylog/graylog:4.2.1
environment:
- GRAYLOG_PASSWORD_SECRET=somepasswordpepper
- GRAYLOG_ROOT_PASSWORD_SHA2=d74ff0ee8da3b9806b18c877dbf29bbde50b5bd8e4dad7a3a725000feb82e8f1
- GRAYLOG_HTTP_EXTERNAL_URI=http://localhost:9000/
entrypoint: /usr/bin/tini -- wait-for-it elasticsearch:9200 -- /docker-entrypoint.sh
networks:
default:
depends_on:
- mongo
- elasticsearch
ports:
- 9000:9000
- 12201:12201

# -- tools
dockerize:
image: jwilder/dockerize
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dev =
ipython==7.30.0
isort==5.10.1
logging-gelf==0.0.26
marshmallow==3.13.0
memory-profiler==0.58.0
mkdocs==1.2.3
mkdocs-click==0.5.0
Expand Down Expand Up @@ -97,7 +98,7 @@ skip_glob=venv
profile=black

[tool:pytest]
addopts = -v --cov-report term-missing --cov-config=.coveragerc --cov=src/ralph --hypothesis-show-statistics
addopts = -v --cov-report term-missing --cov-config=.coveragerc --cov=src/ralph
python_files =
test_*.py
tests.py
Expand Down
1 change: 1 addition & 0 deletions src/ralph/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ class BackendTypes(Enum):
"""Backend types"""

DATABASE = auto()
LOGGING = auto()
STORAGE = auto()
STREAM = auto()
Empty file.
17 changes: 17 additions & 0 deletions src/ralph/backends/logging/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Base logging backend for Ralph"""

from abc import ABC, abstractmethod


class BaseLogging(ABC):
"""Base logging backend interface"""

name = "base"

@abstractmethod
def get(self, chunk_size=10):
"""Read chunk_size records and stream them to stdout"""

@abstractmethod
def send(self, chunk_size=10, ignore_errors=False):
"""Write chunk_size records from stdin"""
203 changes: 203 additions & 0 deletions src/ralph/backends/logging/graylog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Graylog storage backend for Ralph"""

import json
import logging
import sys
from itertools import zip_longest

import requests
from logging_gelf.formatters import GELFFormatter
from logging_gelf.handlers import GELFTCPSocketHandler

from ...defaults import (
RALPH_GRAYLOG_ADMIN_PASSWORD,
RALPH_GRAYLOG_ADMIN_USERNAME,
RALPH_GRAYLOG_EXTERNAL_PORT,
RALPH_GRAYLOG_HOST,
RALPH_GRAYLOG_PORT,
)
from ..mixins import HistoryMixin
from .base import BaseLogging

logger = logging.getLogger(__name__)


class GraylogAPI:
"""Defines Graylog API useful endpoints functions."""

def __init__(self, url, username, password, headers):

self.url = url
self.username = username
self.password = password
self.headers = headers

@property
def _auth(self):
return (self.username, self.password)

def get(self, endpoint, params=None):
"""GET method."""

with requests.get(
f"{self.url}{endpoint}",
params=params,
auth=self._auth,
headers=self.headers,
) as result:

result.raise_for_status()

return result.text

def post(self, endpoint, data):
"""POST method."""

with requests.post(
f"{self.url}{endpoint}", json=data, auth=self._auth, headers=self.headers
) as result:
result.raise_for_status()

return result.text

def put(self, endpoint):
"""PUT method."""

with requests.put(
f"{self.url}{endpoint}", auth=self._auth, headers=self.headers
) as result:
result.raise_for_status()

return result

def get_node_id(self):
"""Returns node id of the Graylog cluster."""

return next(iter(json.loads(self.get(endpoint="/api/cluster"))))

def list_inputs(self):
"""Returns list of the created inputs on the Graylog cluster."""

return self.get("/api/system/inputs")

def launch_input(self, data):
"""Launches a new input on the Graylog cluster."""

return self.post("/api/system/inputs", data=data)

def input_state(self, input_id):
"""Returns identified input with `given_id`."""

return self.get(f"/api/system/inputstates/{input_id}")

def activate_input(self, input_id):
"""Activates a launched input."""

return self.put(f"/api/system/inputstates/{input_id}")

def search_logs(self, params):
"""Returns logs matching given `params` parameters."""

return self.get("/api/search/universal/relative", params=params)


class GraylogLogging(HistoryMixin, BaseLogging):
"""Graylog logging backend"""

# pylint: disable=too-many-arguments

name = "graylog"

def __init__(
self,
host=RALPH_GRAYLOG_HOST,
port=RALPH_GRAYLOG_PORT,
external_port=RALPH_GRAYLOG_EXTERNAL_PORT,
username=RALPH_GRAYLOG_ADMIN_USERNAME,
password=RALPH_GRAYLOG_ADMIN_PASSWORD,
):
self.host = host
self.port = port
self.external_port = external_port
self.username = username
self.password = password

self.gelf_logger = logging.getLogger("gelf")
self.gelf_logger.setLevel(logging.INFO)

self.api = GraylogAPI(
url=f"http://{self.host}:{self.external_port}",
username=self.username,
password=self.password,
headers={
"X-Requested-By": "Learning Analytics Playground",
"Content-Type": "application/json",
},
)

@property
def input_configuration(self):
"""Returns input configuration"""

return {
"node": self.api.get_node_id(),
"configuration": {
"bind_address": self.host,
"port": int(self.port),
"tls_enable": False,
},
"global": False,
"title": "TCP input",
"type": "org.graylog2.inputs.gelf.tcp.GELFTCPInput",
}

@staticmethod
def check_input_exists(inputs, title):
"""Returns the `input_id` of a given input if it has already been created in the
Graylog cluster.
"""

for input in inputs: # pylint:disable=redefined-builtin
if input["title"] == title:
return input["id"]

return None

def send(self, chunk_size=10, ignore_errors=False):
"""Send logs in graylog backend (one JSON event per line)."""

logger.debug("Logging events (chunk size: %d)", chunk_size)

chunks = zip_longest(*([iter(sys.stdin.readlines())] * chunk_size))
inputs = json.loads(self.api.list_inputs())["inputs"]
title = self.input_configuration["title"]

input_id = self.check_input_exists(inputs=inputs, title=title)
if input_id is None:
input_id = json.loads(self.api.launch_input(data=self.input_configuration))[
"id"
]

self.api.activate_input(input_id=input_id)

handler = GELFTCPSocketHandler(host=self.host, port=self.port)
handler.setFormatter(GELFFormatter())
self.gelf_logger.addHandler(handler)

for chunk in chunks:
for event in chunk:
self.gelf_logger.info(event)

def get(self, chunk_size=10):
"""Read chunk_size records and stream them to stdout."""

logger.debug("Fetching events (chunk_size: %d)", chunk_size)

messages = json.loads(self.api.search_logs(params={"query": "*"}))["messages"]

events = [message["message"]["message"] for message in messages]
chunks = [events[i : i + chunk_size] for i in range(0, len(events), chunk_size)]

for chunk in chunks:
for event in chunk:
sys.stdout.buffer.write(bytes(f"{event}" + "\n", encoding="utf-8"))
14 changes: 12 additions & 2 deletions src/ralph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DEFAULT_BACKEND_CHUNK_SIZE,
ENVVAR_PREFIX,
DatabaseBackends,
LoggingBackends,
Parsers,
StorageBackends,
StreamBackends,
Expand All @@ -37,10 +38,13 @@

# Lazy evaluations
DATABASE_BACKENDS = (lambda: [backend.value for backend in DatabaseBackends])()
LOGGING_BACKENDS = (lambda: [backend.value for backend in LoggingBackends])()
PARSERS = (lambda: [parser.value for parser in Parsers])()
STORAGE_BACKENDS = (lambda: [backend.value for backend in StorageBackends])()
STREAM_BACKENDS = (lambda: [backend.value for backend in StreamBackends])()
BACKENDS = (lambda: DATABASE_BACKENDS + STORAGE_BACKENDS + STREAM_BACKENDS)()
BACKENDS = (
lambda: DATABASE_BACKENDS + LOGGING_BACKENDS + STORAGE_BACKENDS + STREAM_BACKENDS
)()


class CommaSeparatedKeyValueParamType(click.ParamType):
Expand Down Expand Up @@ -312,14 +316,18 @@ def fetch(backend, archive, chunk_size, **options):
backend.get(chunk_size=chunk_size)
elif backend_type == BackendTypes.STREAM:
backend.stream()
elif backend_type == BackendTypes.LOGGING:
backend.get(chunk_size)
elif backend_type is None:
msg = "Cannot find an implemented backend type for backend %s"
logger.error(msg, backend)
raise UnsupportedBackendException(msg, backend)


@click.argument("archive", required=False)
@backends_options(backends=(lambda: DATABASE_BACKENDS + STORAGE_BACKENDS)())
@backends_options(
backends=(lambda: DATABASE_BACKENDS + LOGGING_BACKENDS + STORAGE_BACKENDS)()
)
@click.option(
"-c",
"--chunk-size",
Expand Down Expand Up @@ -353,6 +361,8 @@ def push(backend, archive, chunk_size, force, ignore_errors, **options):

if backend_type == BackendTypes.STORAGE:
backend.write(archive, overwrite=force)
elif backend_type == BackendTypes.LOGGING:
backend.send(chunk_size, ignore_errors)
elif backend_type == BackendTypes.DATABASE:
backend.put(chunk_size=chunk_size, ignore_errors=ignore_errors)
elif backend_type is None:
Expand Down
Loading

0 comments on commit 8d20434

Please sign in to comment.