-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8bdcd0f
commit 883322c
Showing
5 changed files
with
512 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
#!/usr/bin/env python3 | ||
import os | ||
import time | ||
import concurrent.futures | ||
import shlex | ||
import shutil | ||
import subprocess | ||
if os.name == 'nt': | ||
from subprocess import CREATE_NO_WINDOW | ||
import re | ||
from io import BytesIO | ||
from uuid import uuid4 | ||
from multiprocessing import cpu_count, freeze_support | ||
from sys import platform | ||
from pikepdf import Pdf, PdfImage, Page, PdfError, models, _cpphelpers | ||
from tempfile import TemporaryDirectory | ||
import pypdfium2 as pdfium | ||
from PIL import Image, ImageEnhance | ||
|
||
def run_args(return_text=True): | ||
kwargs={'capture_output':True, | ||
'text':return_text} | ||
|
||
# Prevent console pop-ups in pyinstaller GUI-Applications | ||
if os.name == 'nt': | ||
kwargs['creationflags'] = CREATE_NO_WINDOW | ||
|
||
return kwargs | ||
|
||
class PDFocr(): | ||
def __init__(self, file=None, outpath='./output.pdf', open_when_done=False, threads=0, omp_thread_limit=False): | ||
|
||
if not PDFocr.tesseractAvailable(): | ||
raise RuntimeError('Tesseract und / oder deutsche Sprachdateien nicht verfügbar.') | ||
|
||
if not file or not file.lower().endswith('.pdf'): | ||
raise FileNotFoundError('No PDF-Document available') | ||
|
||
# Disable Tesseract multithreading in order | ||
# to speed up parallel processing of the extracted images | ||
if omp_thread_limit: | ||
os.environ['OMP_THREAD_LIMIT']='1' | ||
else: | ||
os.environ.pop('OMP_THREAD_LIMIT', None) | ||
|
||
self.posix = os.name == 'posix' | ||
|
||
self.startOCR(file, outpath, threads) | ||
|
||
if open_when_done and os.path.exists(outpath): | ||
if platform.startswith('linux'): | ||
cmd=["xdg-open", f"{outpath}"] | ||
subprocess.call(cmd) | ||
elif platform.lower().startswith('win'): | ||
os.startfile(outpath) | ||
else: | ||
os.popen(f"open '{outpath}'") | ||
|
||
def checkIfSupported (filepath): | ||
'''Checks if the file is a PDF file without text and just one image on each page''' | ||
# Check if only one image on each page | ||
if not filepath.lower().endswith('.pdf') or not os.path.exists(filepath): | ||
return False | ||
|
||
try: | ||
pdf = Pdf.open(filepath) | ||
pdf.remove_unreferenced_resources() | ||
|
||
# Check if only one image on each page | ||
for page in pdf.pages: | ||
if not len(list(page.images.keys())) == 1: | ||
pdf.close() | ||
return False | ||
pdf.close() | ||
|
||
# Check if PDF has any text | ||
pdf = pdfium.PdfDocument(filepath) | ||
for page_number in range (len(pdf)): | ||
if len(pdf[page_number].get_textpage().get_text_bounded()): | ||
return False | ||
return True | ||
except Exception as e: | ||
# In case of any error return False | ||
return False | ||
|
||
def tesseractAvailable(): | ||
try: | ||
# Check if Tesseract is available, German language is installed | ||
# and jbig2dec is installed | ||
list_languages = subprocess.run(["tesseract", "--list-langs"], **run_args()) | ||
if list_languages.returncode == 0 and 'deu' in list_languages.stdout: | ||
if shutil.which('jbig2dec'): | ||
return True | ||
except Exception as e: | ||
return False | ||
return False | ||
|
||
def startOCR (self, file, outpath, threads=0): | ||
pdf = Pdf.open(file) | ||
|
||
# Fix for cases where some PDFs reference ALL images on ALL pages | ||
pdf.remove_unreferenced_resources() | ||
|
||
pageCollection=[] | ||
|
||
# Creating single page PDFs since passing a page directly raises a pickle exception / images can not be accessed :( | ||
for page in pdf.pages: | ||
tempPDF = Pdf.new() | ||
tempPDF.pages.append(page) | ||
pageCollection.append(tempPDF) | ||
|
||
textPages={} | ||
self.text = '' | ||
|
||
start = time.time() | ||
with TemporaryDirectory() as tmpdir: | ||
if threads == 0: | ||
threads = cpu_count() | ||
|
||
# Some multithreading :) | ||
with concurrent.futures.ThreadPoolExecutor(threads) as executor: | ||
future_page_analyzer = {executor.submit(self.ocrPage, pageCollection[pageNumber], pageNumber, tmpdir, file): pageNumber for pageNumber in range(len(pdf.pages))} | ||
for future in concurrent.futures.as_completed(future_page_analyzer): | ||
thread = future_page_analyzer[future] | ||
try: | ||
if future.result()[1] != None: | ||
textPages[future.result()[0]]=future.result()[1] | ||
except Exception as exc: | ||
print(f'Thread {thread} generated an exception: {exc} - {file}') | ||
|
||
# Reassemble the PDF | ||
with Pdf.new() as ocrPdf: | ||
for pageNumber in range(len(pageCollection)): | ||
src = textPages[pageNumber] | ||
ocrPdf.pages.extend(src.pages) | ||
ocrPdf.save(outpath) | ||
|
||
#print(f'Duration: {time.time()-start} with {threads} Threads') | ||
|
||
pageCollection.clear() | ||
pdf.close() | ||
|
||
def ocrPage(self, PDF, pageNumber, tmpdir, file): | ||
|
||
listOfImages = list(PDF.pages[0].images.keys()) | ||
|
||
if not len(listOfImages) == 1: | ||
raise RuntimeError('PDF has more than one image on a single page.') | ||
|
||
image = listOfImages[0] | ||
|
||
try: | ||
# Try to extract image with pikepdf | ||
try: | ||
pdfimage = PdfImage(PDF.pages[0].images[image]).as_pil_image() | ||
|
||
# If pikepdf fails render page with pypdfium2 | ||
except models.image.HifiPrintImageNotTranscodableError: | ||
originalPDF = pdfium.PdfDocument(file) | ||
pdfimage = originalPDF[pageNumber].render(scale = 150/72).to_pil() | ||
|
||
# In case even pypdfium fails create an empty A4 dummy page | ||
# The result will be a page without text | ||
except Exception as e: | ||
pdfimage = Image.new("RGBA", (1240, 1754), (255, 0, 0, 0)) | ||
|
||
# Resize image to 150 DPI based on A4 | ||
limitA=1240 | ||
limitB=1754 | ||
if pdfimage.size[0] < pdfimage.size[1] and pdfimage.size[0] > limitA: | ||
targetSize = (limitA, int(limitA/pdfimage.size[0]*pdfimage.size[1])) | ||
pdfimage=pdfimage.resize(targetSize) | ||
elif pdfimage.size[0] > pdfimage.size[1] and pdfimage.size[0] > limitB: | ||
targetSize = (limitB, int(limitB/pdfimage.size[0]*pdfimage.size[1])) | ||
pdfimage=pdfimage.resize(targetSize) | ||
|
||
# Convert to RGB and enhance contrast (a bit) | ||
pdfimage=pdfimage.convert(mode='RGB') | ||
pdfimage = ImageEnhance.Contrast(pdfimage).enhance(2) | ||
|
||
# Analyze rotation | ||
try: | ||
tesseract_image = os.path.join(tmpdir, f'{uuid4().hex}.png') | ||
pdfimage.save ((tesseract_image),"PNG", dpi=(150,150)) | ||
|
||
tesseract_command = f"tesseract {tesseract_image} stdout --psm 0 --dpi 150 -c min_characters_to_try=10" | ||
|
||
osd_output = subprocess.run(shlex.split(tesseract_command, self.posix), **run_args()) | ||
|
||
match = re.search(r"Rotate: (\d{1,3})", osd_output.stdout) | ||
if match: | ||
rotateDegrees = int(match.group(1)) | ||
else: | ||
rotateDegrees = 0 | ||
|
||
except Exception as e: | ||
rotateDegrees = 0 | ||
|
||
# Rotate image upright | ||
if not rotateDegrees == 0: | ||
pdfimage= pdfimage.rotate(rotateDegrees, expand = 1) | ||
pdfimage.save ((tesseract_image),"PNG", dpi=(150,150)) | ||
del pdfimage | ||
|
||
# OCR with tesseract and create transparent PDF | ||
tesseract_command = f"tesseract {tesseract_image} stdout -c textonly_pdf=1 --dpi 150 -l deu pdf" | ||
pdf_output = subprocess.run(shlex.split(tesseract_command, posix=self.posix), **run_args(return_text=False)) | ||
|
||
os.remove(tesseract_image) | ||
if not pdf_output.returncode == 0: | ||
raise Exception('OCR-Operation failed') | ||
|
||
# Extra steps due to some issue with older versions of leptonica that invert | ||
# colors of the image in exported PDF documents + rotate generated PDF to match source | ||
pdf = Pdf.open(BytesIO(pdf_output.stdout)) | ||
if not rotateDegrees == 0: | ||
pdf.pages[0].rotate(rotateDegrees, relative=True) | ||
destination_page = Page(PDF.pages[0]) | ||
overlayText = Page(pdf.pages[0]) | ||
|
||
# Add transparent text to original page | ||
destination_page.add_overlay(overlayText, expand=True) | ||
pdf.close() | ||
|
||
# catch all exceptions and pass (returns the original page) | ||
except Exception as e: | ||
print(e) | ||
pass | ||
|
||
return (pageNumber, PDF) | ||
|
||
if __name__ == "__main__": | ||
freeze_support() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
#!/usr/bin/env python3 | ||
import os | ||
import sys | ||
import pypdfium2 as pdfium | ||
import docx2txt | ||
import lxml.etree as ET | ||
from pathlib import Path | ||
from zipfile import ZipFile | ||
from tempfile import TemporaryDirectory | ||
|
||
VERSION = '0.1' | ||
|
||
class Baseclass(): | ||
def __init__(self, path) -> None: | ||
self.text = '' | ||
|
||
def print_text (self): | ||
print(self.text) | ||
|
||
def contains (self, words, case_sensitive = False): | ||
searchtext = self.text | ||
for item in ('\r\n', '\n'): | ||
searchtext = searchtext.replace (item, '') | ||
|
||
if not case_sensitive: | ||
searchtext = searchtext.lower() | ||
|
||
if isinstance(words,str): | ||
words = words.split() | ||
|
||
if not any(['-' in word for word in words]): | ||
searchtext = searchtext.replace ('-', '') | ||
|
||
for word in words: | ||
|
||
if not case_sensitive: | ||
word = word.lower() | ||
|
||
if word in searchtext: | ||
return True | ||
|
||
return False | ||
|
||
class PDF(Baseclass): | ||
def __init__(self, path) -> None: | ||
super().__init__(path) | ||
|
||
pdf = pdfium.PdfDocument(path) | ||
for page_number in range (len(pdf)): | ||
textpage = pdf[page_number].get_textpage() | ||
self.text += textpage.get_text_bounded() | ||
|
||
class DOCX(Baseclass): | ||
def __init__(self, path) -> None: | ||
super().__init__(path) | ||
self.text = docx2txt.process(path) | ||
|
||
class TXT(Baseclass): | ||
def __init__(self, path) -> None: | ||
super().__init__(path) | ||
try: | ||
self.text = Path(path).read_text(encoding='utf-8') | ||
except UnicodeDecodeError: | ||
self.text = Path(path).read_text(encoding='cp1252') | ||
|
||
class XLSX(Baseclass): | ||
def __init__(self, path) -> None: | ||
super().__init__(path) | ||
with TemporaryDirectory() as tmpdir: | ||
with ZipFile(path, 'r') as zip: | ||
zip.extractall(tmpdir) | ||
sharedStrings = os.path.join(tmpdir, 'xl', 'sharedStrings.xml') | ||
if os.path.exists(sharedStrings): | ||
self.text = XML(sharedStrings).text | ||
|
||
class ODT(Baseclass): | ||
def __init__(self, path) -> None: | ||
super().__init__(path) | ||
with TemporaryDirectory() as tmpdir: | ||
with ZipFile(path, 'r') as zip: | ||
zip.extractall(tmpdir) | ||
content = os.path.join(tmpdir, 'content.xml') | ||
if os.path.exists(content): | ||
self.text = XML(content).text | ||
|
||
class ODS(ODT): | ||
def __init__(self, path) -> None: | ||
super().__init__(path) | ||
|
||
class XML(Baseclass): | ||
def __init__(self, path) -> None: | ||
super().__init__(path) | ||
tree = ET.parse(path) | ||
self.text = ET.tostring(tree, encoding='unicode', method='text') | ||
|
||
def phrase_search(path = None, phrase = '', case_sensitive = False): | ||
if path.lower().endswith('.pdf'): | ||
document = PDF(path) | ||
elif path.lower().endswith('.docx'): | ||
document = DOCX(path) | ||
elif path.lower().endswith(('.txt', '.csv')): | ||
document = TXT(path) | ||
elif path.lower().endswith('.odt'): | ||
document = ODT(path) | ||
elif path.lower().endswith('.ods'): | ||
document = ODT(path) | ||
elif path.lower().endswith('.xlsx'): | ||
document = XLSX(path) | ||
elif path.lower().endswith(('.xml', '.html')): | ||
document = XML(path) | ||
else: | ||
raise ValueError(f"File not supported: {path}") | ||
|
||
if len(document.text) == 0: | ||
raise EOFError(f"File contains no text: {path}") | ||
|
||
return document.contains(phrase, case_sensitive) | ||
|
||
def get_document_text(path = None, lower_case = False): | ||
if path.lower().endswith('.pdf'): | ||
text = PDF(path).text | ||
elif path.lower().endswith('.docx'): | ||
text = DOCX(path).text | ||
elif path.lower().endswith('.odt'): | ||
text = ODT(path).text | ||
elif path.lower().endswith('.ods'): | ||
text = ODS(path).text | ||
elif path.lower().endswith(('.txt', '.csv')): | ||
text = TXT(path).text | ||
elif path.lower().endswith('.xlsx'): | ||
text = XLSX(path).text | ||
elif path.lower().endswith(('.xml', '.html')): | ||
text = XML(path).text | ||
elif path.lower().endswith(('.cer', '.zip', '.p7s', '.pks', '.pkcs7', '.doc', '.xls', '.tiff', '.tif', '.png', '.jpg')): | ||
text = '' | ||
else: | ||
raise ValueError(f"File not supported: {path}") | ||
|
||
return text.lower() if lower_case else text | ||
|
||
if __name__ == "__main__": | ||
if len(sys.argv) > 1: | ||
file_path = sys.argv[1] | ||
try: | ||
print(get_document_text(file_path, lower_case=True), end='') | ||
except Exception as e: | ||
print(e) | ||
sys.exit(1) | ||
else: | ||
print(f'Version {str(VERSION)}', end='') | ||
sys.exit(1) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.