Skip to content

First attempt: Simple API #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions sumologic/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from datetime import datetime, timedelta
from enum import Enum
from time import sleep
from abc import ABC, abstractmethod
from typing import *


def _sumoTime(date = None):
date = date or datetime.now()
return int(date.timestamp()*1000)

class State(Enum):
DONE = 'DONE GATHERING RESULTS'
GATHERING = 'GATHERING RESULTS'

states = { i.value for i in State }


class _ResultGenerator:

MAX_PER_REQ = 1000
MIN_PER_REQ = 10
delay = 1

def __init__(self, sumo, searchJob: dict):
"""
:type sumo: sumologic.SumoLogic
"""
self.client = sumo
self.yielded = 0
self.searchJob = searchJob

@abstractmethod
def getListOfRecords(self) -> list:
pass

@abstractmethod
def getCountFromStatus(self, status: dict) -> int:
pass

def _yield_next_n(self, n: int, yieldUntil: int):
while self.yielded < yieldUntil:
records = self.getListOfRecords()
numMessages = len(records)
self.yielded += numMessages
yield from (m['map'] for m in records)

def _yield_from_status(self, status: dict):
count = self.getCountFromStatus(status)
state = State(status['state'])
n_waiting = count - self.yielded

if (state == State.DONE or (state == State.GATHERING and n_waiting > self.MIN_PER_REQ)):
yield from self._yield_next_n(self.MAX_PER_REQ, count)

def yield_all(self):
while True:
status = self.client.search_job_status(self.searchJob)
from pprint import pprint
yield from self._yield_from_status(status)

state = State(status['state'])
if state == State.DONE:
assert self.yielded == self.getCountFromStatus(status)
break

sleep(self.delay)

class _MessagesGenerator(_ResultGenerator):
def getListOfRecords(self):
return self.client.search_job_messages(self.searchJob, limit=self.MAX_PER_REQ, offset=self.yielded)['messages']
def getCountFromStatus(self, status):
return status['messageCount']

class _RecordsGenerator(_ResultGenerator):
def getListOfRecords(self):
return self.client.search_job_records(self.searchJob, limit=self.MAX_PER_REQ, offset=self.yielded)['records']
def getCountFromStatus(self, status):
return status['recordCount']

class SumoLogicSimple:

def __init__(self, sumo):
"""
Initialize the Simple SumoLogic API.

:type sumo: sumologic.SumoLogic
"""
self.client = sumo

@staticmethod
def _getTime(t: Union[datetime, timedelta, None]) -> datetime:
if isinstance(t, datetime):
return t
elif t is None:
return datetime.now()
else:
return datetime.now() + t

def search(self, query, startTime: Union[datetime, timedelta, None], endTime: Union[datetime, timedelta, None], timeZone='UTC'):
"""
Search Sumo with a given query, and return a streaming iterable of results.

:type query: str
:type startTime: Union[datetime, timedelta]
:type endTime: Union[datetime, timedelta]

:return Tuple of (fields, messages, records).
:rtype: Tuple[dict, Iterable[dict], Iterable[dict]]
"""
MAX_PER_REQ = 1000
MIN_PER_REQ = 10

messages_yielded = 0
records_yielded = 0

startTime = self._getTime(startTime)
endTime = self._getTime(endTime)

sj = self.client.search_job(query, _sumoTime(startTime), _sumoTime(endTime), timeZone=timeZone, byReceiptTime=False)

firstResponse = self.client.search_job_messages(sj, limit=1)
fields = firstResponse['fields']

messagesGenerator = _MessagesGenerator(self.client, sj)
recordsGenerator = _RecordsGenerator(self.client, sj)

return (fields, messagesGenerator.yield_all(), recordsGenerator.yield_all())
7 changes: 5 additions & 2 deletions sumologic/sumologic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import requests
from .simple import SumoLogicSimple

try:
import cookielib
Expand All @@ -23,6 +24,8 @@ def __init__(self, accessId, accessKey, endpoint=None, cookieFile='cookies.txt')
if endpoint[-1:] == "/":
raise Exception("Endpoint should not end with a slash character")

self.simple = SumoLogicSimple(self)

def _get_endpoint(self):
"""
SumoLogic REST API endpoint changes based on the geo location of the client.
Expand Down Expand Up @@ -63,10 +66,10 @@ def post(self, method, params, headers=None):
return r

def put(self, method, params, headers=None):
r = self.session.put(self.endpoint + method, data=json.dumps(params), headers=headers)
r = self.session.put(self.endpoint + method, data=json.dumps(params), headers=headers)
if 400 <= r.status_code < 600:
r.reason = r.text
r.raise_for_status()
r.raise_for_status()
return r

def search(self, query, fromTime=None, toTime=None, timeZone='UTC'):
Expand Down